package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.class */
public class CachingInMemorySessionStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 600;
    private static final Long DEFAULT_TIMESTAMP = 10L;
    private static final long SEGMENT_INTERVAL = 100;
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private final Bytes keyA = Bytes.wrap("a".getBytes());
    private final Bytes keyAA = Bytes.wrap("aa".getBytes());
    private final Bytes keyB = Bytes.wrap("b".getBytes());
    private SessionStore<Bytes, byte[]> underlyingStore;
    private InternalMockProcessorContext context;
    private CachingSessionStore cachingStore;
    private ThreadCache cache;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest$CacheFlushListenerStub.class */
    public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
        final Deserializer<K> keyDeserializer;
        final Deserializer<V> valueDesializer;
        final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList();

        CacheFlushListenerStub(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
            this.keyDeserializer = deserializer;
            this.valueDesializer = deserializer2;
        }

        public void apply(byte[] bArr, byte[] bArr2, byte[] bArr3, long j) {
            this.forwarded.add(new KeyValueTimestamp<>(this.keyDeserializer.deserialize((String) null, bArr), new Change(this.valueDesializer.deserialize((String) null, bArr2), this.valueDesializer.deserialize((String) null, bArr3)), j));
        }

        public void apply(Record<byte[], Change<byte[]>> record) {
            this.forwarded.add(new KeyValueTimestamp<>(this.keyDeserializer.deserialize((String) null, (byte[]) record.key()), new Change(this.valueDesializer.deserialize((String) null, (byte[]) ((Change) record.value()).newValue), this.valueDesializer.deserialize((String) null, (byte[]) ((Change) record.value()).oldValue)), record.timestamp()));
        }
    }

    @Before
    public void before() {
        this.underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope");
        this.cachingStore = new CachingSessionStore(this.underlyingStore, SEGMENT_INTERVAL);
        this.cache = new ThreadCache(new LogContext("testCache "), 600L, new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, TOPIC, new RecordHeaders()));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    @After
    public void after() {
        this.cachingStore.close();
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        SessionStore sessionStore = (SessionStore) EasyMock.mock(InMemorySessionStore.class);
        CachingSessionStore cachingSessionStore = new CachingSessionStore(sessionStore, SEGMENT_INTERVAL);
        EasyMock.expect(sessionStore.name()).andStubReturn("store");
        sessionStore.init(this.context, cachingSessionStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{sessionStore});
        cachingSessionStore.init(this.context, cachingSessionStore);
        EasyMock.verify(new Object[]{sessionStore});
    }

    @Test
    public void shouldDelegateInit() {
        SessionStore sessionStore = (SessionStore) EasyMock.mock(InMemorySessionStore.class);
        CachingSessionStore cachingSessionStore = new CachingSessionStore(sessionStore, SEGMENT_INTERVAL);
        EasyMock.expect(sessionStore.name()).andStubReturn("store");
        sessionStore.init(this.context, cachingSessionStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{sessionStore});
        cachingSessionStore.init(this.context, cachingSessionStore);
        EasyMock.verify(new Object[]{sessionStore});
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyA, 0L, 0L);
        KeyValueIterator findSessions2 = this.cachingStore.findSessions(this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions2.next(), new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse(findSessions.hasNext());
        Assert.assertFalse(findSessions2.hasNext());
    }

    @Test
    public void shouldPutFetchAllKeysFromCache() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldPutBackwardFetchAllKeysFromCache() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator backwardFindSessions = this.cachingStore.backwardFindSessions(this.keyA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFindSessions.next(), new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFindSessions.next(), new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFindSessions.next(), new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse(backwardFindSessions.hasNext());
    }

    @Test
    public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush"));
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingSessionStore cachingSessionStore = this.cachingStore;
        cachingSessionStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingSessionStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingSessionStore cachingSessionStore = this.cachingStore;
        cachingSessionStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingSessionStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringWrappedStoreClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingSessionStore cachingSessionStore = this.cachingStore;
        cachingSessionStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingSessionStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (SessionStore) EasyMock.createNiceMock(SessionStore.class);
        EasyMock.expect(this.underlyingStore.name()).andStubReturn("store-name");
        EasyMock.expect(Boolean.valueOf(this.underlyingStore.isOpen())).andStubReturn(true);
        EasyMock.replay(new Object[]{this.underlyingStore});
        this.cachingStore = new CachingSessionStore(this.underlyingStore, SEGMENT_INTERVAL);
        this.cache = (ThreadCache) EasyMock.niceMock(ThreadCache.class);
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        internalMockProcessorContext.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, new RecordHeaders()));
        this.cachingStore.init(internalMockProcessorContext, this.cachingStore);
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyAA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) findSessions.next(), new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCache() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator backwardFindSessions = this.cachingStore.backwardFindSessions(this.keyAA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFindSessions.next(), new Windowed(this.keyB, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFindSessions.next(), new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse(backwardFindSessions.hasNext());
    }

    @Test
    public void shouldFetchAllSessionsWithSameRecordKey() {
        List<KeyValue> asList = Arrays.asList(KeyValue.pair(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(10L, 10L)), "2".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL)), "3".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(1000L, 1000L)), "4".getBytes()));
        for (KeyValue keyValue : asList) {
            this.cachingStore.put((Windowed) keyValue.key, (byte[]) keyValue.value);
        }
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "5".getBytes());
        StreamsTestUtils.verifyKeyValueList(asList, StreamsTestUtils.toList(this.cachingStore.fetch(this.keyA)));
    }

    @Test
    public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
        List<KeyValue> asList = Arrays.asList(KeyValue.pair(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(10L, 10L)), "2".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL)), "3".getBytes()), KeyValue.pair(new Windowed(this.keyA, new SessionWindow(1000L, 1000L)), "4".getBytes()));
        for (KeyValue keyValue : asList) {
            this.cachingStore.put((Windowed) keyValue.key, (byte[]) keyValue.value);
        }
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(0L, 0L)), "5".getBytes());
        List list = StreamsTestUtils.toList(this.cachingStore.backwardFetch(this.keyA));
        Collections.reverse(list);
        StreamsTestUtils.verifyKeyValueList(asList, list);
    }

    @Test
    public void shouldFlushItemsToStoreOnEviction() {
        List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow = addSessionsUntilOverflow("a", "b", "c", "d");
        Assert.assertEquals(addSessionsUntilOverflow.size() - 1, this.cache.size());
        KeyValue keyValue = (KeyValue) this.cachingStore.findSessions((Bytes) ((Windowed) addSessionsUntilOverflow.get(0).key).key(), 0L, 0L).next();
        Assert.assertEquals(addSessionsUntilOverflow.get(0).key, keyValue.key);
        Assert.assertArrayEquals((byte[]) addSessionsUntilOverflow.get(0).value, (byte[]) keyValue.value);
    }

    @Test
    public void shouldQueryItemsInCacheAndStore() {
        StreamsTestUtils.verifyKeyValueList(addSessionsUntilOverflow("a"), StreamsTestUtils.toList(this.cachingStore.findSessions(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)), 0L, r0.size() * 10)));
    }

    @Test
    public void shouldRemove() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed(this.keyB, new SessionWindow(0L, 0L));
        this.cachingStore.put(windowed, "2".getBytes());
        this.cachingStore.put(windowed2, "2".getBytes());
        this.cachingStore.remove(windowed);
        Assert.assertFalse(this.cachingStore.findSessions(this.keyA, 0L, 0L).hasNext());
        Assert.assertNull(this.cachingStore.fetchSession(this.keyA, 0L, 0L));
        MatcherAssert.assertThat(this.cachingStore.fetchSession(this.keyB, 0L, 0L), Matchers.equalTo("2".getBytes()));
    }

    @Test
    public void shouldFetchCorrectlyAcrossSegments() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL));
        Windowed windowed3 = new Windowed(this.keyA, new SessionWindow(200L, 200L));
        Windowed windowed4 = new Windowed(this.keyA, new SessionWindow(300L, 300L));
        Windowed windowed5 = new Windowed(this.keyA, new SessionWindow(400L, 400L));
        Windowed windowed6 = new Windowed(this.keyA, new SessionWindow(500L, 500L));
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed2, "2".getBytes());
        this.cachingStore.put(windowed3, "3".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(windowed4, "4".getBytes());
        this.cachingStore.put(windowed5, "5".getBytes());
        this.cachingStore.put(windowed6, "6".getBytes());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyA, 0L, 500L);
        Assert.assertEquals(windowed, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed2, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed3, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed4, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed5, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed6, ((KeyValue) findSessions.next()).key);
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldBackwardFetchCorrectlyAcrossSegments() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL));
        Windowed windowed3 = new Windowed(this.keyA, new SessionWindow(200L, 200L));
        Windowed windowed4 = new Windowed(this.keyA, new SessionWindow(300L, 300L));
        Windowed windowed5 = new Windowed(this.keyA, new SessionWindow(400L, 400L));
        Windowed windowed6 = new Windowed(this.keyA, new SessionWindow(500L, 500L));
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed2, "2".getBytes());
        this.cachingStore.put(windowed3, "3".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(windowed4, "4".getBytes());
        this.cachingStore.put(windowed5, "5".getBytes());
        this.cachingStore.put(windowed6, "6".getBytes());
        KeyValueIterator backwardFindSessions = this.cachingStore.backwardFindSessions(this.keyA, 0L, 500L);
        Assert.assertEquals(windowed6, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertEquals(windowed5, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertEquals(windowed4, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertEquals(windowed3, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertEquals(windowed2, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertEquals(windowed, ((KeyValue) backwardFindSessions.next()).key);
        Assert.assertFalse(backwardFindSessions.hasNext());
    }

    @Test
    public void shouldFetchRangeCorrectlyAcrossSegments() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed(this.keyAA, new SessionWindow(0L, 0L));
        Windowed windowed3 = new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL));
        Windowed windowed4 = new Windowed(this.keyA, new SessionWindow(200L, 200L));
        Windowed windowed5 = new Windowed(this.keyAA, new SessionWindow(200L, 200L));
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed2, "1".getBytes());
        this.cachingStore.put(windowed3, "2".getBytes());
        this.cachingStore.put(windowed4, "3".getBytes());
        this.cachingStore.put(windowed5, "3".getBytes());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyA, this.keyAA, 0L, 200L);
        ArrayList arrayList = new ArrayList();
        while (findSessions.hasNext()) {
            arrayList.add(((KeyValue) findSessions.next()).key);
        }
        findSessions.close();
        Assert.assertEquals(Arrays.asList(windowed, windowed2, windowed3, windowed4, windowed5), arrayList);
    }

    @Test
    public void shouldBackwardFetchRangeCorrectlyAcrossSegments() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed(this.keyAA, new SessionWindow(0L, 0L));
        Windowed windowed3 = new Windowed(this.keyA, new SessionWindow(SEGMENT_INTERVAL, SEGMENT_INTERVAL));
        Windowed windowed4 = new Windowed(this.keyA, new SessionWindow(200L, 200L));
        Windowed windowed5 = new Windowed(this.keyAA, new SessionWindow(200L, 200L));
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed2, "1".getBytes());
        this.cachingStore.put(windowed3, "2".getBytes());
        this.cachingStore.put(windowed4, "3".getBytes());
        this.cachingStore.put(windowed5, "3".getBytes());
        KeyValueIterator backwardFindSessions = this.cachingStore.backwardFindSessions(this.keyA, this.keyAA, 0L, 200L);
        ArrayList arrayList = new ArrayList();
        while (backwardFindSessions.hasNext()) {
            arrayList.add(((KeyValue) backwardFindSessions.next()).key);
        }
        backwardFindSessions.close();
        Assert.assertEquals(Arrays.asList(windowed5, windowed4, windowed3, windowed2, windowed), arrayList);
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue(this.cachingStore.setFlushListener((CacheFlushListener) null, true));
        Assert.assertTrue(this.cachingStore.setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldForwardChangedValuesDuringFlush() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(2L, 4L));
        Windowed windowed2 = new Windowed(this.keyA, new SessionWindow(1L, 2L));
        Windowed windowed3 = new Windowed("a", new SessionWindow(2L, 4L));
        Windowed windowed4 = new Windowed("a", new SessionWindow(1L, 2L));
        CacheFlushListenerStub cacheFlushListenerStub = new CacheFlushListenerStub(new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
        this.cachingStore.setFlushListener(cacheFlushListenerStub, true);
        this.cachingStore.put(windowed2, "1".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp(windowed4, new Change("1", (Object) null), DEFAULT_TIMESTAMP.longValue())), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp(windowed3, new Change("1", (Object) null), DEFAULT_TIMESTAMP.longValue())), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
        this.cachingStore.put(windowed, "2".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp(windowed3, new Change("2", "1"), DEFAULT_TIMESTAMP.longValue())), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp(windowed3, new Change((Object) null, "2"), DEFAULT_TIMESTAMP.longValue())), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed, "2".getBytes());
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.emptyList(), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
    }

    @Test
    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
        Windowed windowed = new Windowed(this.keyA, new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed("a", new SessionWindow(0L, 0L));
        CacheFlushListenerStub cacheFlushListenerStub = new CacheFlushListenerStub(new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
        this.cachingStore.setFlushListener(cacheFlushListenerStub, false);
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(windowed, "2".getBytes());
        this.cachingStore.flush();
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(windowed2, new Change("1", (Object) null), DEFAULT_TIMESTAMP.longValue()), new KeyValueTimestamp(windowed2, new Change("2", (Object) null), DEFAULT_TIMESTAMP.longValue()), new KeyValueTimestamp(windowed2, new Change((Object) null, (Object) null), DEFAULT_TIMESTAMP.longValue())), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
        this.cachingStore.put(windowed, "1".getBytes());
        this.cachingStore.put(windowed, "2".getBytes());
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.emptyList(), cacheFlushListenerStub.forwarded);
        cacheFlushListenerStub.forwarded.clear();
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 1L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(2L, 3L)), "2".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(4L, 5L)), "3".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(6L, 7L)), "4".getBytes());
        KeyValueIterator findSessions = this.cachingStore.findSessions(this.keyAA, 0L, 10L);
        KeyValueIterator findSessions2 = this.cachingStore.findSessions(this.keyAA, this.keyAA, 0L, 10L);
        Assert.assertEquals(findSessions.next(), findSessions2.next());
        Assert.assertEquals(findSessions.next(), findSessions2.next());
        Assert.assertFalse(findSessions.hasNext());
        Assert.assertFalse(findSessions2.hasNext());
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 1L)), "1".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(2L, 3L)), "2".getBytes());
        this.cachingStore.put(new Windowed(this.keyAA, new SessionWindow(4L, 5L)), "3".getBytes());
        this.cachingStore.put(new Windowed(this.keyB, new SessionWindow(6L, 7L)), "4".getBytes());
        KeyValueIterator backwardFindSessions = this.cachingStore.backwardFindSessions(this.keyAA, 0L, 10L);
        KeyValueIterator backwardFindSessions2 = this.cachingStore.backwardFindSessions(this.keyAA, this.keyAA, 0L, 10L);
        Assert.assertEquals(backwardFindSessions.next(), backwardFindSessions2.next());
        Assert.assertEquals(backwardFindSessions.next(), backwardFindSessions2.next());
        Assert.assertFalse(backwardFindSessions.hasNext());
        Assert.assertFalse(backwardFindSessions2.hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals(1L, this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals(0L, this.cache.size());
    }

    @Test
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.fetch(this.keyA);
        });
    }

    @Test
    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.findSessions(this.keyA, 0L, Long.MAX_VALUE);
        });
    }

    @Test
    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.remove(new Windowed(this.keyA, new SessionWindow(0L, 0L)));
        });
    }

    @Test
    public void shouldThrowIfTryingToPutIntoClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.put(new Windowed(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.findSessions((Bytes) null, 1L, 2L);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.findSessions((Bytes) null, this.keyA, 1L, 2L);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.findSessions(this.keyA, (Bytes) null, 1L, 2L);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullFromKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.fetch((Bytes) null, this.keyA);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullToKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.fetch(this.keyA, (Bytes) null);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.fetch((Bytes) null);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnRemoveNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.remove((Windowed) null);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.put((Windowed) null, "1".getBytes());
        });
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() {
        Bytes wrap = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
        Bytes wrap2 = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(CachingSessionStore.class);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(this.cachingStore.backwardFindSessions(wrap, wrap2, 0L, 10L).hasNext());
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        Bytes wrap = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
        Bytes wrap2 = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(CachingSessionStore.class);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(this.cachingStore.findSessions(wrap, wrap2, 0L, 10L).hasNext());
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(String... strArr) {
        Random random = new Random();
        ArrayList arrayList = new ArrayList();
        while (this.cache.size() == arrayList.size()) {
            addSingleSession(strArr[random.nextInt(strArr.length)], arrayList);
        }
        return arrayList;
    }

    private void addSingleSession(String str, List<KeyValue<Windowed<Bytes>, byte[]>> list) {
        int size = list.size() * 10;
        Windowed windowed = new Windowed(Bytes.wrap(str.getBytes()), new SessionWindow(size, size));
        byte[] bytes = "1".getBytes();
        this.cachingStore.put(windowed, bytes);
        list.add(KeyValue.pair(windowed, bytes));
    }
}
