package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.class */
public class RocksDBVersionedStoreTest {
    private static final String STORE_NAME = "myversionedrocks";
    private static final String METRICS_SCOPE = "versionedrocksdb";
    private static final long HISTORY_RETENTION = 300000;
    private static final long GRACE_PERIOD = 300000;
    private static final long SEGMENT_INTERVAL = 100000;
    private static final long BASE_TIMESTAMP = 10;
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final String DROPPED_RECORDS_METRIC = "dropped-records-total";
    private static final String TASK_LEVEL_GROUP = "stream-task-metrics";
    private InternalMockProcessorContext context;
    private Map<String, String> expectedMetricsTags;
    private RocksDBVersionedStore store;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest$DataRecord.class */
    public static class DataRecord {
        final String key;
        final String value;
        final long timestamp;

        DataRecord(String str, String str2, long j) {
            this.key = str;
            this.value = str2;
            this.timestamp = j;
        }
    }

    @BeforeEach
    public void before() {
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
        this.context.setTime(BASE_TIMESTAMP);
        this.expectedMetricsTags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.context.taskId().toString())});
        this.store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 300000L, SEGMENT_INTERVAL);
        this.store.init(this.context, this.store);
    }

    @AfterEach
    public void after() {
        this.store.close();
    }

    @Test
    public void shouldPutLatest() {
        putToStore("k", "v", BASE_TIMESTAMP, -1L);
        putToStore("k", "v2", 11L, -1L);
        verifyGetValueFromStore("k", "v2", 11L);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, 11L);
        verifyTimestampedGetValueFromStore("k", 11L, "v2", 11L, -1L);
        verifyTimestampedGetValueFromStore("k", 12L, "v2", 11L, -1L);
    }

    @Test
    public void shouldPutNullAsLatest() {
        putToStore("k", null, BASE_TIMESTAMP, -1L);
        putToStore("k", null, 11L, -1L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
        verifyTimestampedGetNullFromStore("k", 11L);
        verifyTimestampedGetNullFromStore("k", 12L);
    }

    @Test
    public void shouldPutOlderWithNonNullLatest() {
        putToStore("k", "v", BASE_TIMESTAMP, -1L);
        putToStore("k", "v2", 8L, BASE_TIMESTAMP);
        putToStore("k", "v1", 9L, BASE_TIMESTAMP);
        putToStore("k", "v4", 6L, 8L);
        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, -1L);
        verifyTimestampedGetValueFromStore("k", 9L, "v1", 9L, BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", 8L, "v2", 8L, 9L);
        verifyTimestampedGetValueFromStore("k", 7L, "v4", 6L, 8L);
    }

    @Test
    public void shouldPutOlderWithNullLatest() {
        putToStore("k", null, BASE_TIMESTAMP, -1L);
        putToStore("k", "v2", 8L, BASE_TIMESTAMP);
        putToStore("k", "v1", 9L, BASE_TIMESTAMP);
        putToStore("k", "v4", 6L, 8L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", 9L, "v1", 9L, BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", 8L, "v2", 8L, 9L);
        verifyTimestampedGetValueFromStore("k", 7L, "v4", 6L, 8L);
    }

    @Test
    public void shouldPutOlderNullWithNonNullLatest() {
        putToStore("k", "v", BASE_TIMESTAMP, -1L);
        putToStore("k", null, 8L, BASE_TIMESTAMP);
        putToStore("k", null, 9L, BASE_TIMESTAMP);
        putToStore("k", null, 6L, 8L);
        putToStore("k", "v5", 5L, 6L);
        putToStore("k", "v3", 7L, 8L);
        putToStore("k", null, 4L, 5L);
        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, -1L);
        verifyTimestampedGetNullFromStore("k", 9L);
        verifyTimestampedGetNullFromStore("k", 8L);
        verifyTimestampedGetValueFromStore("k", 7L, "v3", 7L, 8L);
        verifyTimestampedGetNullFromStore("k", 6L);
        verifyTimestampedGetValueFromStore("k", 5L, "v5", 5L, 6L);
        verifyTimestampedGetNullFromStore("k", 4L);
    }

    @Test
    public void shouldPutOlderNullWithNullLatest() {
        putToStore("k", null, BASE_TIMESTAMP, -1L);
        putToStore("k", null, 8L, BASE_TIMESTAMP);
        putToStore("k", null, 9L, BASE_TIMESTAMP);
        putToStore("k", null, 6L, 8L);
        putToStore("k", "v3", 7L, 8L);
        putToStore("k", "v5", 5L, 6L);
        putToStore("k", null, 4L, 5L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
        verifyTimestampedGetNullFromStore("k", 9L);
        verifyTimestampedGetNullFromStore("k", 8L);
        verifyTimestampedGetValueFromStore("k", 7L, "v3", 7L, 8L);
        verifyTimestampedGetNullFromStore("k", 6L);
        verifyTimestampedGetValueFromStore("k", 5L, "v5", 5L, 6L);
        verifyTimestampedGetNullFromStore("k", 4L);
    }

    @Test
    public void shouldPutRepeatTimestampAsLatest() {
        putToStore("k", "to_be_replaced", BASE_TIMESTAMP, -1L);
        putToStore("k", "b", BASE_TIMESTAMP, -1L);
        verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP, -1L);
        verifyTimestampedGetNullFromStore("k", 9L);
        putToStore("k", null, BASE_TIMESTAMP, -1L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
        verifyTimestampedGetNullFromStore("k", 9L);
        putToStore("k", null, BASE_TIMESTAMP, -1L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
        verifyTimestampedGetNullFromStore("k", 9L);
        putToStore("k", "b", BASE_TIMESTAMP, -1L);
        verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP, -1L);
        verifyTimestampedGetNullFromStore("k", 9L);
    }

    @Test
    public void shouldPutRepeatTimestamps() {
        putToStore("k", "to_be_replaced", 100020L, -1L);
        putToStore("k", null, 99990L, 100020L);
        putToStore("k", "to_be_replaced", 99990L, 100020L);
        putToStore("k", null, 99990L, 100020L);
        putToStore("k", "to_be_replaced", 99999L, 100020L);
        putToStore("k", "to_be_replaced", 100001L, 100020L);
        putToStore("k", null, 99999L, 100001L);
        putToStore("k", null, 100001L, 100020L);
        putToStore("k", null, 100010L, 100020L);
        putToStore("k", null, 100005L, 100010L);
        putToStore("k", "vp5", 100005L, 100010L);
        putToStore("k", "to_be_replaced", 99995L, 99999L);
        putToStore("k", "vn5", 99995L, 99999L);
        putToStore("k", null, 100020L, -1L);
        putToStore("k", null, 100020L, -1L);
        putToStore("k", "vn6", 99994L, 99995L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetNullFromStore("k", 100015L);
        verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldPutIntoMultipleSegments() {
        putToStore("k", null, 99980L, -1L);
        putToStore("k", "vn10", 99990L, -1L);
        putToStore("k", null, 99999L, -1L);
        putToStore("k", null, 100001L, -1L);
        putToStore("k", "vp10", 100010L, -1L);
        putToStore("k", null, 100020L, -1L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        verifyTimestampedGetNullFromStore("k", 100005L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99999L);
        verifyTimestampedGetNullFromStore("k", 99985L);
    }

    @Test
    public void shouldMoveRecordToOlderSegmentDuringPut() {
        putToStore("k", "vp20", 100020L, -1L);
        putToStore("k", "vp10", 100010L, 100020L);
        putToStore("k", "vn10", 99990L, 100010L);
        putToStore("k", "vn2", 99998L, 100010L);
        putToStore("k", "vn1", 99999L, 100010L);
        putToStore("k", "vp1", 100001L, 100010L);
        verifyGetValueFromStore("k", "vp20", 100020L);
        verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        verifyTimestampedGetValueFromStore("k", 100005L, "vp1", 100001L, 100010L);
        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", 99999L, 100001L);
        verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100001L);
        verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99998L);
    }

    @Test
    public void shouldMoveRecordToOlderSegmentWithNullsDuringPut() {
        putToStore("k", null, 100020L, -1L);
        putToStore("k", null, 99999L, 100020L);
        putToStore("k", null, 100001L, 100020L);
        putToStore("k", null, 99990L, 99999L);
        putToStore("k", null, 100010L, 100020L);
        putToStore("k", "vp5", 100005L, 100010L);
        putToStore("k", "vn5", 99995L, 99999L);
        putToStore("k", "vn6", 99994L, 99995L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetNullFromStore("k", 100015L);
        verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldFallThroughToExistingOlderSegmentAsLatestDuringPut() {
        putToStore("k", null, 99995L, -1L);
        putToStore("k", "vn6", 99994L, 99995L);
        putToStore("k", "vp20", 100020L, -1L);
        putToStore("k", null, 100010L, 100020L);
        putToStore("k", null, 99999L, 100010L);
        putToStore("k", "vn2", 99998L, 99999L);
        verifyGetValueFromStore("k", "vp20", 100020L);
        verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        verifyTimestampedGetNullFromStore("k", 100012L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        verifyTimestampedGetNullFromStore("k", 99995L);
        verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
    }

    @Test
    public void shouldPutNonLatestTombstoneIntoNewSegmentWithValidTo() {
        putToStore("k", "vp30", 100030L, -1L);
        putToStore("k", null, 99990L, 100030L);
        putToStore("k", "vn5", 99995L, 100030L);
        putToStore("k", "vn1", 99999L, 100030L);
        putToStore("k", null, 99998L, 99999L);
        verifyGetValueFromStore("k", "vp30", 100030L);
        verifyTimestampedGetValueFromStore("k", 100010L, "vn1", 99999L, 100030L);
        verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100030L);
        verifyTimestampedGetNullFromStore("k", 99998L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99998L);
        verifyTimestampedGetNullFromStore("k", 99990L);
    }

    @Test
    public void shouldDelete() {
        putToStore("k", "vp20", 100020L, -1L);
        putToStore("k", "vp10", 100010L, 100020L);
        putToStore("k", "vn10", 99990L, 100010L);
        putToStore("k", "vn2", 99998L, 100010L);
        VersionedRecord<String> deleteFromStore = deleteFromStore("k", 99995L);
        MatcherAssert.assertThat(deleteFromStore.value(), CoreMatchers.equalTo("vn10"));
        MatcherAssert.assertThat(Long.valueOf(deleteFromStore.timestamp()), CoreMatchers.equalTo(99990L));
        VersionedRecord<String> deleteFromStore2 = deleteFromStore("k", 100010L);
        MatcherAssert.assertThat(deleteFromStore2.value(), CoreMatchers.equalTo("vp10"));
        MatcherAssert.assertThat(Long.valueOf(deleteFromStore2.timestamp()), CoreMatchers.equalTo(100010L));
        MatcherAssert.assertThat(deleteFromStore("k", 100010L), CoreMatchers.nullValue());
        VersionedRecord<String> deleteFromStore3 = deleteFromStore("k", 100025L);
        MatcherAssert.assertThat(deleteFromStore3.value(), CoreMatchers.equalTo("vp20"));
        MatcherAssert.assertThat(Long.valueOf(deleteFromStore3.timestamp()), CoreMatchers.equalTo(100020L));
    }

    @Test
    public void shouldNotPutExpired() {
        putToStore("k", "v", 300010L, -1L);
        putToStore("k1", "v1", BASE_TIMESTAMP, -1L);
        verifyGetValueFromStore("k1", "v1", BASE_TIMESTAMP);
        putToStore("k2", "v2", 9L, Long.MIN_VALUE);
        verifyGetNullFromStore("k2");
        verifyExpiredRecordSensor(1);
    }

    @Test
    public void shouldNotDeleteExpired() {
        putToStore("k1", "v1", 1L, -1L);
        putToStore("k2", "v2", 1L, -1L);
        putToStore("kother", "vother", 300010L, -1L);
        VersionedRecord<String> deleteFromStore = deleteFromStore("k1", BASE_TIMESTAMP);
        MatcherAssert.assertThat(deleteFromStore.value(), CoreMatchers.equalTo("v1"));
        MatcherAssert.assertThat(Long.valueOf(deleteFromStore.timestamp()), CoreMatchers.equalTo(1L));
        verifyGetNullFromStore("k1");
        MatcherAssert.assertThat(deleteFromStore("k2", 9L), CoreMatchers.nullValue());
        verifyGetValueFromStore("k2", "v2", 1L);
        verifyExpiredRecordSensor(1);
    }

    @Test
    public void shouldGetFromOlderSegments() {
        putToStore("ko", null, 99990L, -1L);
        putToStore("ko", null, 199990L, -1L);
        putToStore("ko", null, 299990L, -1L);
        verifyTimestampedGetNullFromStore("k", 99980L);
        putToStore("k", "v", 99980L, -1L);
        putToStore("k", null, 99990L, -1L);
        verifyTimestampedGetNullFromStore("k", 99970L);
        verifyTimestampedGetValueFromStore("k", 99985L, "v", 99980L, 99990L);
        verifyTimestampedGetNullFromStore("k", 99995L);
        putToStore("k", "v2", 299980L, -1L);
        putToStore("k", null, 299990L, -1L);
        verifyTimestampedGetNullFromStore("k", 99970L);
        verifyTimestampedGetValueFromStore("k", 99985L, "v", 99980L, 99990L);
        verifyTimestampedGetNullFromStore("k", 99995L);
    }

    @Test
    public void shouldNotGetExpired() {
        putToStore("k", "v_old", 0L, -1L);
        putToStore("k", "v", 99990L, -1L);
        verifyTimestampedGetValueFromStore("k", 99989L, "v_old", 0L, 99990L);
        putToStore("ko", "vo", 399989L, -1L);
        verifyTimestampedGetValueFromStore("k", 99989L, "v_old", 0L, 99990L);
        putToStore("ko", "vo2", 399990L, -1L);
        verifyTimestampedGetNullFromStore("k", 99989L);
    }

    @Test
    public void shouldGetExpiredIfLatestValue() {
        putToStore("k", "v", 1L, -1L);
        putToStore("ko", "vo_old", 1L, -1L);
        putToStore("ko", "vo_new", 300012L, -1L);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 1L, -1L);
        verifyTimestampedGetNullFromStore("ko", BASE_TIMESTAMP);
    }

    @Test
    public void shouldDistinguishEmptyAndNull() {
        putToStore("k", null, 100020L, -1L);
        putToStore("k", null, 99990L, 100020L);
        putToStore("k", null, 99999L, 100020L);
        putToStore("k", null, 100001L, 100020L);
        putToStore("k", null, 100010L, 100020L);
        putToStore("k", "", 100005L, 100010L);
        putToStore("k", "", 99995L, 99999L);
        putToStore("k", "", 99994L, 99995L);
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetNullFromStore("k", 100015L);
        verifyTimestampedGetValueFromStore("k", 100006L, "", 100005L, 100010L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "", 99995L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99994L, "", 99994L, 99995L);
        verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldGetRecordVersionsFromOlderSegments() {
        putToStore("ko", null, 99990L, -1L);
        putToStore("ko", null, 199990L, -1L);
        putToStore("ko", null, 299990L, -1L);
        verifyTimestampedGetNullFromStore("k", 99980L, SEGMENT_INTERVAL);
        putToStore("k", "v1", 99970L, -1L);
        putToStore("k", "v2", 99975L, -1L);
        putToStore("k", null, 99980L, -1L);
        putToStore("k", null, 99985L, -1L);
        putToStore("k", "v3", 99990L, -1L);
        putToStore("k", "v4", 99995L, -1L);
        verifyTimestampedGetNullFromStore("k", 99960L, 99965L);
        verifyTimestampedGetValueFromStore("k", 99970L, 99995L, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        verifyTimestampedGetValueFromStore("k", 99996L, SEGMENT_INTERVAL, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        verifyTimestampedGetValueFromStore("k", 99995L, 99995L, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        verifyTimestampedGetValueFromStore("k", 99996L, 99996L, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        verifyTimestampedGetValueFromStore("k", 99969L, 99979L, ResultOrder.ANY, Arrays.asList("v2", "v1"), Arrays.asList(99975L, 99970L), Arrays.asList(99980L, 99975L));
        verifyTimestampedGetValueFromStore("k", 99976L, 99989L, ResultOrder.ANY, Collections.singletonList("v2"), Collections.singletonList(99975L), Collections.singletonList(99980L));
        verifyTimestampedGetValueFromStore("k", 99989L, 99996L, ResultOrder.ANY, Arrays.asList("v4", "v3"), Arrays.asList(99995L, 99990L), Arrays.asList(-1L, 99995L));
        verifyTimestampedGetValueFromStore("k", 99974L, 99995L, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        verifyTimestampedGetValueFromStore("k", 99985L, 99990L, ResultOrder.ANY, Collections.singletonList("v3"), Collections.singletonList(99990L), Collections.singletonList(99995L));
        verifyTimestampedGetNullFromStore("k", 99981L, 99984L);
        putToStore("k", "v5", 299970L, -1L);
        putToStore("k", null, 299980L, -1L);
        verifyTimestampedGetNullFromStore("k", 99960L, 99965L);
        verifyTimestampedGetValueFromStore("k", 99970L, 99974L, ResultOrder.ANY, Collections.singletonList("v1"), Collections.singletonList(99970L), Collections.singletonList(99975L));
        verifyTimestampedGetNullFromStore("k", 99981L, 99984L);
    }

    @Test
    public void shouldGetRecordVersionsInAscendingOrder() {
        putToStore("k", "v1", 99970L, -1L);
        putToStore("k", "v2", 99975L, -1L);
        putToStore("k", "v3", 99990L, -1L);
        putToStore("k", "v4", 99995L, -1L);
        verifyTimestampedGetValueFromStore("k", 99970L, 99995L, ResultOrder.ASCENDING, Arrays.asList("v1", "v2", "v3", "v4"), Arrays.asList(99970L, 99975L, 99990L, 99995L), Arrays.asList(99975L, 99990L, 99995L, -1L));
    }

    @Test
    public void shouldGetRecordVersionsFromMultipleOldSegmentsInAscendingOrder() {
        putToStore("k", "v1", 99990L, -1L);
        putToStore("k", "v2", 99995L, -1L);
        putToStore("k", "v3", 199990L, -1L);
        putToStore("k", "v4", 199995L, -1L);
        verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ASCENDING, Arrays.asList("v1", "v2", "v3", "v4"), Arrays.asList(99990L, 99995L, 199990L, 199995L), Arrays.asList(99995L, 199990L, 199995L, -1L));
    }

    @Test
    public void shouldNotGetExpiredRecordVersions() {
        putToStore("k", "v_old", 0L, -1L);
        putToStore("k", "v", 99990L, -1L);
        verifyTimestampedGetValueFromStore("k", 0L, 99989L, ResultOrder.ANY, Collections.singletonList("v_old"), Collections.singletonList(0L), Collections.singletonList(99990L));
        putToStore("ko", "vo", 399989L, -1L);
        verifyTimestampedGetValueFromStore("k", 0L, 99989L, ResultOrder.ANY, Collections.singletonList("v_old"), Collections.singletonList(0L), Collections.singletonList(99990L));
        putToStore("ko", "vo2", 399990L, -1L);
        verifyTimestampedGetNullFromStore("k", 0L, 99989L);
    }

    @Test
    public void shouldGetExpiredIfLatestVersionValue() {
        putToStore("k", "v", 1L, -1L);
        putToStore("ko", "vo_old", 1L, -1L);
        putToStore("ko", "vo_new", 300012L, -1L);
        verifyTimestampedGetValueFromStore("k", 0L, BASE_TIMESTAMP, ResultOrder.ANY, Collections.singletonList("v"), Collections.singletonList(1L), Collections.singletonList(-1L));
        verifyTimestampedGetNullFromStore("ko", 0L, BASE_TIMESTAMP);
    }

    @Test
    public void shouldRestore() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k", "vp20", 100020L));
        arrayList.add(new DataRecord("k", "vp10", 100010L));
        arrayList.add(new DataRecord("k", "vn10", 99990L));
        arrayList.add(new DataRecord("k", "vn2", 99998L));
        arrayList.add(new DataRecord("k", "vn1", 99999L));
        arrayList.add(new DataRecord("k", "vp1", 100001L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        verifyGetValueFromStore("k", "vp20", 100020L);
        verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        verifyTimestampedGetValueFromStore("k", 100005L, "vp1", 100001L, 100010L);
        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", 99999L, 100001L);
        verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100001L);
        verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99998L);
    }

    @Test
    public void shouldRestoreWithNulls() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k", null, 100020L));
        arrayList.add(new DataRecord("k", null, 99999L));
        arrayList.add(new DataRecord("k", null, 100001L));
        arrayList.add(new DataRecord("k", null, 99990L));
        arrayList.add(new DataRecord("k", null, 100010L));
        arrayList.add(new DataRecord("k", "vp5", 100005L));
        arrayList.add(new DataRecord("k", "vn5", 99995L));
        arrayList.add(new DataRecord("k", "vn6", 99994L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetNullFromStore("k", 100015L);
        verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldRestoreWithNullsAndRepeatTimestamps() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k", "to_be_replaced", 100020L));
        arrayList.add(new DataRecord("k", null, 99990L));
        arrayList.add(new DataRecord("k", "to_be_replaced", 99990L));
        arrayList.add(new DataRecord("k", null, 99990L));
        arrayList.add(new DataRecord("k", "to_be_replaced", 99999L));
        arrayList.add(new DataRecord("k", "to_be_replaced", 100001L));
        arrayList.add(new DataRecord("k", null, 99999L));
        arrayList.add(new DataRecord("k", null, 100001L));
        arrayList.add(new DataRecord("k", null, 100010L));
        arrayList.add(new DataRecord("k", null, 100005L));
        arrayList.add(new DataRecord("k", "vp5", 100005L));
        arrayList.add(new DataRecord("k", "to_be_replaced", 99995L));
        arrayList.add(new DataRecord("k", "vn5", 99995L));
        arrayList.add(new DataRecord("k", null, 100020L));
        arrayList.add(new DataRecord("k", null, 100020L));
        arrayList.add(new DataRecord("k", "vn6", 99994L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetNullFromStore("k", 100015L);
        verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldRestoreMultipleBatches() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k", null, 99980L));
        arrayList.add(new DataRecord("k", "vn10", 99990L));
        arrayList.add(new DataRecord("k", null, 99999L));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new DataRecord("k", null, 100001L));
        arrayList2.add(new DataRecord("k", "vp10", 100010L));
        arrayList2.add(new DataRecord("k", null, 100020L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        this.store.restoreBatch(getChangelogRecords(arrayList2));
        verifyGetNullFromStore("k");
        verifyTimestampedGetNullFromStore("k", 100030L);
        verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        verifyTimestampedGetNullFromStore("k", 100005L);
        verifyTimestampedGetNullFromStore("k", 100002L);
        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
        verifyTimestampedGetNullFromStore("k", 99999L);
        verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99999L);
        verifyTimestampedGetNullFromStore("k", 99985L);
    }

    @Test
    public void shouldNotRestoreExpired() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k", "v", 300010L));
        arrayList.add(new DataRecord("k1", "v1", BASE_TIMESTAMP));
        arrayList.add(new DataRecord("k2", "v2", 9L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        verifyGetValueFromStore("k", "v", 300010L);
        verifyGetValueFromStore("k1", "v1", BASE_TIMESTAMP);
        verifyGetNullFromStore("k2");
        verifyExpiredRecordSensor(0);
    }

    @Test
    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataRecord("k2", "v2", 0L));
        arrayList.add(new DataRecord("k", "v", 300010L));
        this.store.restoreBatch(getChangelogRecords(arrayList));
        verifyGetValueFromStore("k2", "v2", 0L);
        verifyGetValueFromStore("k", "v", 300010L);
    }

    @Test
    public void shouldAllowZeroHistoryRetention() {
        this.store.close();
        this.store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
        this.store.init(this.context, this.store);
        putToStore("k", "v", BASE_TIMESTAMP, -1L);
        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, -1L);
        verifyTimestampedGetValueFromStore("k", 11L, "v", BASE_TIMESTAMP, -1L);
        putToStore("k", "updated", BASE_TIMESTAMP, -1L);
        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP, -1L);
        putToStore("k", "v2", 12L, -1L);
        verifyGetValueFromStore("k", "v2", 12L);
        verifyTimestampedGetValueFromStore("k", 12L, "v2", 12L, -1L);
        verifyTimestampedGetNullFromStore("k", 11L);
        deleteFromStore("k", 13L);
        verifyGetNullFromStore("k");
        putToStore("k2", "v", 12L, Long.MIN_VALUE);
        verifyGetNullFromStore("k2");
        verifyExpiredRecordSensor(1);
    }

    private void putToStore(String str, String str2, long j, long j2) {
        MatcherAssert.assertThat(Long.valueOf(this.store.put(new Bytes(STRING_SERIALIZER.serialize((String) null, str)), STRING_SERIALIZER.serialize((String) null, str2), j)), CoreMatchers.equalTo(Long.valueOf(j2)));
    }

    private VersionedRecord<String> deleteFromStore(String str, long j) {
        return deserializedRecord(this.store.delete(new Bytes(STRING_SERIALIZER.serialize((String) null, str)), j));
    }

    private VersionedRecord<String> getFromStore(String str) {
        return deserializedRecord(this.store.get(new Bytes(STRING_SERIALIZER.serialize((String) null, str))));
    }

    private VersionedRecord<String> getFromStore(String str, long j) {
        return deserializedRecord(this.store.get(new Bytes(STRING_SERIALIZER.serialize((String) null, str)), j));
    }

    private List<VersionedRecord<String>> getFromStore(String str, long j, long j2, ResultOrder resultOrder) {
        VersionedRecordIterator versionedRecordIterator = this.store.get(new Bytes(STRING_SERIALIZER.serialize((String) null, str)), j, j2, resultOrder);
        ArrayList arrayList = new ArrayList();
        while (versionedRecordIterator.hasNext()) {
            arrayList.add(deserializedRecord((VersionedRecord) versionedRecordIterator.next()));
        }
        return arrayList;
    }

    private void verifyGetValueFromStore(String str, String str2, long j) {
        VersionedRecord<String> fromStore = getFromStore(str);
        MatcherAssert.assertThat(fromStore.value(), CoreMatchers.equalTo(str2));
        MatcherAssert.assertThat(Long.valueOf(fromStore.timestamp()), CoreMatchers.equalTo(Long.valueOf(j)));
        MatcherAssert.assertThat(Boolean.valueOf(fromStore.validTo().isPresent()), CoreMatchers.equalTo(false));
    }

    private void verifyGetNullFromStore(String str) {
        MatcherAssert.assertThat(getFromStore(str), CoreMatchers.nullValue());
    }

    private void verifyTimestampedGetValueFromStore(String str, long j, String str2, long j2, long j3) {
        VersionedRecord<String> fromStore = getFromStore(str, j);
        MatcherAssert.assertThat(fromStore.value(), CoreMatchers.equalTo(str2));
        MatcherAssert.assertThat(Long.valueOf(fromStore.timestamp()), CoreMatchers.equalTo(Long.valueOf(j2)));
        if (j3 == -1) {
            MatcherAssert.assertThat(Boolean.valueOf(fromStore.validTo().isPresent()), CoreMatchers.equalTo(false));
        } else {
            MatcherAssert.assertThat(fromStore.validTo().get(), CoreMatchers.equalTo(Long.valueOf(j3)));
        }
    }

    private void verifyTimestampedGetValueFromStore(String str, long j, long j2, ResultOrder resultOrder, List<String> list, List<Long> list2, List<Long> list3) {
        List<VersionedRecord<String>> fromStore = getFromStore(str, j, j2, resultOrder);
        MatcherAssert.assertThat(Integer.valueOf(fromStore.size()), CoreMatchers.equalTo(Integer.valueOf(list.size())));
        for (int i = 0; i < fromStore.size(); i++) {
            VersionedRecord<String> versionedRecord = fromStore.get(i);
            MatcherAssert.assertThat(versionedRecord.value(), CoreMatchers.equalTo(list.get(i)));
            MatcherAssert.assertThat(Long.valueOf(versionedRecord.timestamp()), CoreMatchers.equalTo(list2.get(i)));
            if (list3.get(i).longValue() == -1) {
                MatcherAssert.assertThat(Boolean.valueOf(versionedRecord.validTo().isPresent()), CoreMatchers.equalTo(false));
            } else {
                MatcherAssert.assertThat(versionedRecord.validTo().get(), CoreMatchers.equalTo(list3.get(i)));
            }
        }
    }

    private void verifyTimestampedGetNullFromStore(String str, long j) {
        MatcherAssert.assertThat(getFromStore(str, j), CoreMatchers.nullValue());
    }

    private void verifyTimestampedGetNullFromStore(String str, long j, long j2) {
        MatcherAssert.assertThat(Integer.valueOf(getFromStore(str, j, j2, ResultOrder.ANY).size()), CoreMatchers.equalTo(0));
    }

    private void verifyExpiredRecordSensor(int i) {
        Assertions.assertEquals(((Double) ((Metric) this.context.metrics().metrics().get(new MetricName(DROPPED_RECORDS_METRIC, TASK_LEVEL_GROUP, "", this.expectedMetricsTags))).metricValue()).doubleValue(), i, 0.001d);
    }

    private static VersionedRecord<String> deserializedRecord(VersionedRecord<byte[]> versionedRecord) {
        if (versionedRecord == null) {
            return null;
        }
        return versionedRecord.validTo().isPresent() ? new VersionedRecord<>(STRING_DESERIALIZER.deserialize((String) null, (byte[]) versionedRecord.value()), versionedRecord.timestamp(), ((Long) versionedRecord.validTo().get()).longValue()) : new VersionedRecord<>(STRING_DESERIALIZER.deserialize((String) null, (byte[]) versionedRecord.value()), versionedRecord.timestamp());
    }

    private static List<ConsumerRecord<byte[], byte[]>> getChangelogRecords(List<DataRecord> list) {
        ArrayList arrayList = new ArrayList();
        for (DataRecord dataRecord : list) {
            byte[] serialize = STRING_SERIALIZER.serialize((String) null, dataRecord.key);
            byte[] serialize2 = STRING_SERIALIZER.serialize((String) null, dataRecord.value);
            arrayList.add(new ConsumerRecord("", 0, 0L, dataRecord.timestamp, TimestampType.CREATE_TIME, serialize.length, serialize2 == null ? 0 : serialize2.length, serialize, serialize2, new RecordHeaders(), Optional.empty()));
        }
        return arrayList;
    }
}
