package org.apache.kafka.streams.state.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.easymock.EasyMock;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.class */
public class MeteredKeyValueStoreTest {
    private static final String APPLICATION_ID = "test-app";
    private static final String STORE_NAME = "store-name";
    private static final String STORE_TYPE = "scope";
    private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-scope-state-metrics";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
    private static final String THREAD_ID_TAG_KEY = "thread-id";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
    private static final String VALUE = "value";
    private static final byte[] VALUE_BYTES = VALUE.getBytes();
    private static final KeyValue<Bytes, byte[]> BYTE_KEY_VALUE_PAIR = KeyValue.pair(KEY_BYTES, VALUE_BYTES);

    @Mock(type = MockType.NICE)
    private KeyValueStore<Bytes, byte[]> inner;

    @Mock(type = MockType.NICE)
    private InternalProcessorContext context;
    private MeteredKeyValueStore<String, String> metered;
    private String storeLevelGroup;
    private String threadIdTagKey;
    private Map<String, String> tags;

    @Parameterized.Parameter
    public String builtInMetricsVersion;

    @Rule
    public EasyMockRule rule = new EasyMockRule(this);
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private final Metrics metrics = new Metrics();

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest$CachedKeyValueStore.class */
    private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"latest"}, new Object[]{"0.10.0-2.4"});
    }

    @Before
    public void before() {
        MockTime mockTime = new MockTime();
        this.metered = new MeteredKeyValueStore<>(this.inner, STORE_TYPE, mockTime, Serdes.String(), Serdes.String());
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        EasyMock.expect(this.context.applicationId()).andStubReturn(APPLICATION_ID);
        EasyMock.expect(this.context.metrics()).andStubReturn(new StreamsMetricsImpl(this.metrics, "test", this.builtInMetricsVersion, mockTime));
        EasyMock.expect(this.context.taskId()).andStubReturn(this.taskId);
        EasyMock.expect(this.context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
        EasyMock.expect(this.inner.name()).andStubReturn(STORE_NAME);
        this.storeLevelGroup = "0.10.0-2.4".equals(this.builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
        this.threadIdTagKey = "0.10.0-2.4".equals(this.builtInMetricsVersion) ? THREAD_ID_TAG_KEY_FROM_0100_TO_24 : THREAD_ID_TAG_KEY;
        this.tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.threadIdTagKey, this.threadId), Utils.mkEntry("task-id", this.taskId.toString()), Utils.mkEntry("scope-state-id", STORE_NAME)});
    }

    private void init() {
        EasyMock.replay(new Object[]{this.inner, this.context});
        this.metered.init(this.context, this.metered);
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        KeyValueStore keyValueStore = (KeyValueStore) EasyMock.mock(KeyValueStore.class);
        MeteredKeyValueStore meteredKeyValueStore = new MeteredKeyValueStore(keyValueStore, STORE_TYPE, new MockTime(), Serdes.String(), Serdes.String());
        EasyMock.expect(keyValueStore.name()).andStubReturn("store");
        keyValueStore.init(this.context, meteredKeyValueStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{keyValueStore, this.context});
        meteredKeyValueStore.init(this.context, meteredKeyValueStore);
        EasyMock.verify(new Object[]{keyValueStore});
    }

    @Test
    public void shouldDelegateInit() {
        KeyValueStore keyValueStore = (KeyValueStore) EasyMock.mock(KeyValueStore.class);
        MeteredKeyValueStore meteredKeyValueStore = new MeteredKeyValueStore(keyValueStore, STORE_TYPE, new MockTime(), Serdes.String(), Serdes.String());
        EasyMock.expect(keyValueStore.name()).andStubReturn("store");
        keyValueStore.init(this.context, meteredKeyValueStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{keyValueStore, this.context});
        meteredKeyValueStore.init(this.context, meteredKeyValueStore);
        EasyMock.verify(new Object[]{keyValueStore});
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME);
        EasyMock.expect(this.context.changelogFor(STORE_NAME)).andReturn((Object) null);
        doShouldPassChangelogTopicNameToStateStoreSerde(storeChangelogTopic);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String str) {
        Serde serde = (Serde) EasyMock.niceMock(Serde.class);
        Serializer serializer = (Serializer) EasyMock.mock(Serializer.class);
        Serde serde2 = (Serde) EasyMock.niceMock(Serde.class);
        Deserializer deserializer = (Deserializer) EasyMock.mock(Deserializer.class);
        Serializer serializer2 = (Serializer) EasyMock.mock(Serializer.class);
        EasyMock.expect(serde.serializer()).andStubReturn(serializer);
        EasyMock.expect(serializer.serialize(str, KEY)).andStubReturn(KEY.getBytes());
        EasyMock.expect(serde2.deserializer()).andStubReturn(deserializer);
        EasyMock.expect(deserializer.deserialize(str, VALUE_BYTES)).andStubReturn(VALUE);
        EasyMock.expect(serde2.serializer()).andStubReturn(serializer2);
        EasyMock.expect(serializer2.serialize(str, VALUE)).andStubReturn(VALUE_BYTES);
        EasyMock.expect(this.inner.get(KEY_BYTES)).andStubReturn(VALUE_BYTES);
        EasyMock.replay(new Object[]{this.inner, this.context, serializer, serde, deserializer, serializer2, serde2});
        this.metered = new MeteredKeyValueStore<>(this.inner, STORE_TYPE, new MockTime(), serde, serde2);
        this.metered.init(this.context, this.metered);
        this.metered.get(KEY);
        this.metered.put(KEY, VALUE);
        EasyMock.verify(new Object[]{serializer, deserializer, serializer2});
    }

    @Test
    public void testMetrics() {
        init();
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", this.storeLevelGroup, this.threadIdTagKey, this.threadId, this.taskId.toString(), STORE_TYPE, STORE_NAME)));
        if ("0.10.0-2.4".equals(this.builtInMetricsVersion)) {
            Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", this.storeLevelGroup, this.threadIdTagKey, this.threadId, this.taskId.toString(), STORE_TYPE, "all")));
        }
    }

    @Test
    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
        this.inner.put(EasyMock.eq(KEY_BYTES), EasyMock.aryEq(VALUE_BYTES));
        EasyMock.expectLastCall();
        init();
        this.metered.put(KEY, VALUE);
        Assert.assertTrue(((Double) metric("put-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
        EasyMock.expect(this.inner.get(KEY_BYTES)).andReturn(VALUE_BYTES);
        init();
        MatcherAssert.assertThat(this.metered.get(KEY), CoreMatchers.equalTo(VALUE));
        Assert.assertTrue(((Double) metric("get-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
        EasyMock.expect(this.inner.putIfAbsent(EasyMock.eq(KEY_BYTES), EasyMock.aryEq(VALUE_BYTES))).andReturn((Object) null);
        init();
        this.metered.putIfAbsent(KEY, VALUE);
        Assert.assertTrue(((Double) metric("put-if-absent-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
        this.inner.putAll((List) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        init();
        this.metered.putAll(Collections.singletonList(KeyValue.pair(KEY, VALUE)));
        Assert.assertTrue(((Double) metric("put-all-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
        EasyMock.expect(this.inner.delete(KEY_BYTES)).andReturn(VALUE_BYTES);
        init();
        this.metered.delete(KEY);
        Assert.assertTrue(((Double) metric("delete-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
        EasyMock.expect(this.inner.range(KEY_BYTES, KEY_BYTES)).andReturn(new KeyValueIteratorStub(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
        init();
        KeyValueIterator range = this.metered.range(KEY, KEY);
        MatcherAssert.assertThat(((KeyValue) range.next()).value, CoreMatchers.equalTo(VALUE));
        Assert.assertFalse(range.hasNext());
        range.close();
        Assert.assertTrue(((Double) metric("range-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
        EasyMock.expect(this.inner.all()).andReturn(new KeyValueIteratorStub(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
        init();
        KeyValueIterator all = this.metered.all();
        MatcherAssert.assertThat(((KeyValue) all.next()).value, CoreMatchers.equalTo(VALUE));
        Assert.assertFalse(all.hasNext());
        all.close();
        Assert.assertTrue(((Double) metric(new MetricName("all-rate", this.storeLevelGroup, "", this.tags)).metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldFlushInnerWhenFlushTimeRecords() {
        this.inner.flush();
        EasyMock.expectLastCall().once();
        init();
        this.metered.flush();
        Assert.assertTrue(((Double) metric("flush-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedKeyValueStore cachedKeyValueStore = (CachedKeyValueStore) EasyMock.mock(CachedKeyValueStore.class);
        EasyMock.expect(Boolean.valueOf(cachedKeyValueStore.setFlushListener((CacheFlushListener) EasyMock.anyObject(CacheFlushListener.class), EasyMock.eq(false)))).andReturn(true);
        EasyMock.replay(new Object[]{cachedKeyValueStore});
        this.metered = new MeteredKeyValueStore<>(cachedKeyValueStore, STORE_TYPE, new MockTime(), Serdes.String(), Serdes.String());
        Assert.assertTrue(this.metered.setFlushListener((CacheFlushListener) null, false));
        EasyMock.verify(new Object[]{cachedKeyValueStore});
    }

    @Test
    public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() {
        EasyMock.expect(this.inner.get(Bytes.wrap("a".getBytes()))).andReturn((Object) null);
        init();
        Assert.assertNull(this.metered.get("a"));
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assert.assertFalse(this.metered.setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldRemoveMetricsOnClose() {
        this.inner.close();
        EasyMock.expectLastCall();
        init();
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        this.metered.close();
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
        this.inner.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("Oops!"));
        init();
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        MeteredKeyValueStore<String, String> meteredKeyValueStore = this.metered;
        meteredKeyValueStore.getClass();
        Assert.assertThrows(RuntimeException.class, meteredKeyValueStore::close);
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
        EasyMock.verify(new Object[]{this.inner});
    }

    private KafkaMetric metric(MetricName metricName) {
        return this.metrics.metric(metricName);
    }

    private KafkaMetric metric(String str) {
        return this.metrics.metric(new MetricName(str, this.storeLevelGroup, "", this.tags));
    }

    private List<MetricName> storeMetrics() {
        return (List) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.group().equals(this.storeLevelGroup) && metricName.tags().equals(this.tags);
        }).collect(Collectors.toList());
    }
}
