package org.apache.kafka.streams;

import java.lang.Thread;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.rocksdb.Options;

@PrepareForTest({KafkaStreams.class, StreamThread.class, ClientMetrics.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private static final String APPLICATION_ID = "appId";
    private static final String CLIENT_ID = "test-client";

    @Rule
    public TestName testName = new TestName();
    private MockClientSupplier supplier;
    private MockTime time;
    private Properties props;

    @Mock
    private StateDirectory stateDirectory;

    @Mock
    private StreamThread streamThreadOne;

    @Mock
    private StreamThread streamThreadTwo;

    @Mock
    private GlobalStreamThread globalStreamThread;

    @Mock
    private Metrics metrics;
    private StateListenerStub streamsStateListener;
    private Capture<List<MetricsReporter>> metricsReportersCapture;
    private Capture<StreamThread.StateListener> threadStatelistenerCapture;

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges++;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$TestRocksDbConfigSetter.class */
    public static class TestRocksDbConfigSetter implements RocksDBConfigSetter {
        public void setConfig(String str, Options options, Map<String, Object> map) {
        }
    }

    @Before
    public void before() throws Exception {
        this.time = new MockTime();
        this.supplier = new MockClientSupplier();
        this.supplier.setClusterForAdminClient(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))));
        this.streamsStateListener = new StateListenerStub();
        this.threadStatelistenerCapture = EasyMock.newCapture();
        this.metricsReportersCapture = EasyMock.newCapture();
        this.props = new Properties();
        this.props.put("application.id", APPLICATION_ID);
        this.props.put("client.id", CLIENT_ID);
        this.props.put("bootstrap.servers", "localhost:2018");
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        prepareStreams();
    }

    private void prepareStreams() throws Exception {
        PowerMock.expectNew(Metrics.class, new Object[]{EasyMock.anyObject(MetricConfig.class), EasyMock.capture(this.metricsReportersCapture), EasyMock.anyObject(Time.class)}).andAnswer(() -> {
            Iterator it = ((List) this.metricsReportersCapture.getValue()).iterator();
            while (it.hasNext()) {
                ((MetricsReporter) it.next()).init(Collections.emptyList());
            }
            return this.metrics;
        }).anyTimes();
        this.metrics.close();
        EasyMock.expectLastCall().andAnswer(() -> {
            Iterator it = ((List) this.metricsReportersCapture.getValue()).iterator();
            while (it.hasNext()) {
                ((MetricsReporter) it.next()).close();
            }
            return null;
        }).anyTimes();
        PowerMock.mockStatic(ClientMetrics.class);
        EasyMock.expect(ClientMetrics.version()).andReturn("1.56");
        EasyMock.expect(ClientMetrics.commitId()).andReturn("1a2b3c4d5e");
        ClientMetrics.addVersionMetric((StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class));
        ClientMetrics.addCommitIdMetric((StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class));
        ClientMetrics.addApplicationIdMetric((StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class), (String) EasyMock.eq(APPLICATION_ID));
        ClientMetrics.addTopologyDescriptionMetric((StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class), EasyMock.anyString());
        ClientMetrics.addStateMetric((StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class), (Gauge) EasyMock.anyObject());
        PowerMock.mockStatic(StreamThread.class);
        EasyMock.expect(StreamThread.create((InternalTopologyBuilder) EasyMock.anyObject(InternalTopologyBuilder.class), (StreamsConfig) EasyMock.anyObject(StreamsConfig.class), (KafkaClientSupplier) EasyMock.anyObject(KafkaClientSupplier.class), (Admin) EasyMock.anyObject(Admin.class), (UUID) EasyMock.anyObject(UUID.class), (String) EasyMock.anyObject(String.class), (StreamsMetricsImpl) EasyMock.anyObject(StreamsMetricsImpl.class), (Time) EasyMock.anyObject(Time.class), (StreamsMetadataState) EasyMock.anyObject(StreamsMetadataState.class), EasyMock.anyLong(), (StateDirectory) EasyMock.anyObject(StateDirectory.class), (StateRestoreListener) EasyMock.anyObject(StateRestoreListener.class), EasyMock.anyInt())).andReturn(this.streamThreadOne).andReturn(this.streamThreadTwo);
        EasyMock.expect(StreamThread.getSharedAdminClientId(EasyMock.anyString())).andReturn("admin").anyTimes();
        EasyMock.expect(Long.valueOf(this.streamThreadOne.getId())).andReturn(0L).anyTimes();
        EasyMock.expect(Long.valueOf(this.streamThreadTwo.getId())).andReturn(1L).anyTimes();
        prepareStreamThread(this.streamThreadOne, true);
        prepareStreamThread(this.streamThreadTwo, false);
        AtomicReference atomicReference = new AtomicReference(GlobalStreamThread.State.CREATED);
        PowerMock.expectNew(GlobalStreamThread.class, new Object[]{EasyMock.anyObject(ProcessorTopology.class), EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Consumer.class), EasyMock.anyObject(StateDirectory.class), Long.valueOf(EasyMock.anyLong()), EasyMock.anyObject(StreamsMetricsImpl.class), EasyMock.anyObject(Time.class), EasyMock.anyString(), EasyMock.anyObject(StateRestoreListener.class)}).andReturn(this.globalStreamThread).anyTimes();
        IExpectationSetters expect = EasyMock.expect(this.globalStreamThread.state());
        atomicReference.getClass();
        expect.andAnswer(atomicReference::get).anyTimes();
        this.globalStreamThread.setStateListener((StreamThread.StateListener) EasyMock.capture(this.threadStatelistenerCapture));
        EasyMock.expectLastCall().anyTimes();
        this.globalStreamThread.start();
        EasyMock.expectLastCall().andAnswer(() -> {
            atomicReference.set(GlobalStreamThread.State.RUNNING);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(this.globalStreamThread, GlobalStreamThread.State.RUNNING, GlobalStreamThread.State.CREATED);
            return null;
        }).anyTimes();
        this.globalStreamThread.shutdown();
        EasyMock.expectLastCall().andAnswer(() -> {
            this.supplier.restoreConsumer.close();
            Iterator<MockProducer> it = this.supplier.producers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            atomicReference.set(GlobalStreamThread.State.DEAD);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(this.globalStreamThread, GlobalStreamThread.State.PENDING_SHUTDOWN, GlobalStreamThread.State.RUNNING);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(this.globalStreamThread, GlobalStreamThread.State.DEAD, GlobalStreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.globalStreamThread.stillRunning())).andReturn(Boolean.valueOf(atomicReference.get() == GlobalStreamThread.State.RUNNING)).anyTimes();
        this.globalStreamThread.join();
        EasyMock.expectLastCall().anyTimes();
        PowerMock.replay(new Object[]{StreamThread.class, Metrics.class, this.metrics, ClientMetrics.class, this.streamThreadOne, this.streamThreadTwo, GlobalStreamThread.class, this.globalStreamThread});
    }

    private void prepareStreamThread(StreamThread streamThread, boolean z) throws Exception {
        AtomicReference atomicReference = new AtomicReference(StreamThread.State.CREATED);
        IExpectationSetters expect = EasyMock.expect(streamThread.state());
        atomicReference.getClass();
        expect.andAnswer(atomicReference::get).anyTimes();
        streamThread.setStateListener((StreamThread.StateListener) EasyMock.capture(this.threadStatelistenerCapture));
        EasyMock.expectLastCall().anyTimes();
        streamThread.start();
        EasyMock.expectLastCall().andAnswer(() -> {
            atomicReference.set(StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.STARTING, StreamThread.State.CREATED);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
            return null;
        }).anyTimes();
        streamThread.shutdown();
        EasyMock.expectLastCall().andAnswer(() -> {
            this.supplier.consumer.close();
            this.supplier.restoreConsumer.close();
            Iterator<MockProducer> it = this.supplier.producers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            atomicReference.set(StreamThread.State.DEAD);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).anyTimes();
        EasyMock.expect(Boolean.valueOf(streamThread.isRunning())).andReturn(Boolean.valueOf(atomicReference.get() == StreamThread.State.RUNNING)).anyTimes();
        streamThread.join();
        if (z) {
            EasyMock.expectLastCall().anyTimes();
        } else {
            EasyMock.expectLastCall().andAnswer(() -> {
                Thread.sleep(50L);
                return null;
            }).anyTimes();
        }
        EasyMock.expect(streamThread.allStandbyTasks()).andStubReturn(Collections.emptyList());
        EasyMock.expect(streamThread.restoringTaskIds()).andStubReturn(Collections.emptySet());
        EasyMock.expect(streamThread.allStreamsTasks()).andStubReturn(Collections.emptyList());
    }

    @Test
    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.close();
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.setStateListener(this.streamsStateListener);
        Assert.assertEquals(0L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.CREATED, kafkaStreams.state());
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.streamsStateListener.numChanges == NUM_THREADS;
        }, "Streams never started.");
        Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
        for (Thread thread : kafkaStreams.threads) {
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(thread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
        }
        Assert.assertEquals(3L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
        for (Thread thread2 : kafkaStreams.threads) {
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(thread2, StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
        }
        Assert.assertEquals(3L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
        ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(kafkaStreams.threads[1], StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(kafkaStreams.threads[1], StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals(3L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
        for (StreamThread streamThread : kafkaStreams.threads) {
            if (streamThread != kafkaStreams.threads[1]) {
                ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
            }
        }
        Assert.assertEquals(4L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
        kafkaStreams.close();
        TestUtils.waitForCondition(() -> {
            return this.streamsStateListener.numChanges == 6;
        }, "Streams never closed.");
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
    }

    @Test
    public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.setStateListener(this.streamsStateListener);
        Assert.assertEquals(0L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.CREATED, kafkaStreams.state());
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.streamsStateListener.numChanges == NUM_THREADS;
        }, "Streams never started.");
        Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
        for (Thread thread : kafkaStreams.threads) {
            ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(thread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
        }
        Assert.assertEquals(3L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
        ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(kafkaStreams.threads[1], StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_REVOKED);
        ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(kafkaStreams.threads[1], StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals(3L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
        for (StreamThread streamThread : kafkaStreams.threads) {
            if (streamThread != kafkaStreams.threads[1]) {
                ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_REVOKED);
                ((StreamThread.StateListener) this.threadStatelistenerCapture.getValue()).onChange(streamThread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            }
        }
        Assert.assertEquals(4L, this.streamsStateListener.numChanges);
        Assert.assertEquals(KafkaStreams.State.ERROR, kafkaStreams.state());
        kafkaStreams.close();
        TestUtils.waitForCondition(() -> {
            return this.streamsStateListener.numChanges == 6;
        }, "Streams never closed.");
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        kafkaStreams.close();
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
        }, "Streams never stopped.");
        Assert.assertTrue(this.supplier.consumer.closed());
        Assert.assertTrue(this.supplier.restoreConsumer.closed());
        Iterator<MockProducer> it = this.supplier.producers.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it.next().closed());
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        try {
            Assert.assertEquals(2L, kafkaStreams.threads.length);
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            for (int i = 0; i < NUM_THREADS; i++) {
                StreamThread streamThread = kafkaStreams.threads[i];
                streamThread.shutdown();
                TestUtils.waitForCondition(() -> {
                    return streamThread.state() == StreamThread.State.DEAD;
                }, "Thread never stopped.");
                kafkaStreams.threads[i].join();
            }
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.ERROR;
            }, "Streams never stopped.");
            kafkaStreams.close();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }, "Streams never stopped.");
            Assert.assertNull(kafkaStreams.globalStreamThread);
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            GlobalStreamThread globalStreamThread = kafkaStreams.globalStreamThread;
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> {
                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }, "Thread never stopped.");
            globalStreamThread.join();
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.ERROR);
            kafkaStreams.close();
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue("some reporters should be initialized by calling on construction", MockMetricsReporter.INIT_COUNT.get() - i > 0);
                kafkaStreams.start();
                int i2 = MockMetricsReporter.CLOSE_COUNT.get();
                kafkaStreams.close();
                Assert.assertEquals(i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.close();
        int i = MockMetricsReporter.CLOSE_COUNT.get();
        kafkaStreams.close();
        Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        try {
            kafkaStreams.start();
            Assert.fail("Should have throw IllegalStateException");
        } catch (IllegalStateException e) {
        } finally {
            kafkaStreams.close();
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        try {
            kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener) null);
            Assert.fail("Should throw an IllegalStateException");
        } catch (IllegalStateException e) {
        } finally {
            kafkaStreams.close();
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        try {
            kafkaStreams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
            Assert.fail("Should throw IllegalStateException");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        try {
            kafkaStreams.setStateListener((KafkaStreams.StateListener) null);
            Assert.fail("Should throw IllegalStateException");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.cleanUp();
            kafkaStreams.start();
        } finally {
            kafkaStreams.close();
            kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, "Streams never started.");
        try {
            kafkaStreams.cleanUp();
            Assert.fail("Should have thrown IllegalStateException");
        } catch (IllegalStateException e) {
            Assert.assertEquals("Cannot clean up while running.", e.getMessage());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time).allMetadata();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time).allMetadataForStore("store");
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() {
        new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time).queryMetadataForKey("store", "key", Serdes.String().serializer());
    }

    @Test
    public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        Assert.assertEquals(KeyQueryMetadata.NOT_AVAILABLE, kafkaStreams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() {
        new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time).queryMetadataForKey("store", "key", (str, str2, obj, i) -> {
            return 0;
        });
    }

    @Test
    public void shouldReturnEmptyLocalStorePartitionLags() {
        ListOffsetsResult listOffsetsResult = (ListOffsetsResult) EasyMock.mock(ListOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(Collections.emptyMap());
        EasyMock.expect(listOffsetsResult.all()).andReturn(kafkaFutureImpl);
        MockAdminClient mockAdminClient = (MockAdminClient) EasyMock.partialMockBuilder(MockAdminClient.class).addMockedMethod("listOffsets", new Class[]{Map.class}).createMock();
        EasyMock.expect(mockAdminClient.listOffsets((Map) EasyMock.anyObject())).andStubReturn(listOffsetsResult);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) EasyMock.partialMockBuilder(MockClientSupplier.class).addMockedMethod("getAdmin").createMock();
        EasyMock.expect(mockClientSupplier.getAdmin((Map) EasyMock.anyObject())).andReturn(mockAdminClient);
        EasyMock.replay(new Object[]{listOffsetsResult, mockAdminClient, mockClientSupplier});
        new KafkaStreams(new StreamsBuilder().build(), this.props, mockClientSupplier, this.time).start();
        Assert.assertEquals(0L, r0.allLocalStorePartitionLags().size());
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier);
        Throwable th = null;
        try {
            Assert.assertFalse(kafkaStreams.close(Duration.ofMillis(10L)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void shouldThrowOnNegativeTimeoutForClose() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.close(Duration.ofMillis(-1L));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertFalse(kafkaStreams.close(Duration.ZERO));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.niceMock(ScheduledExecutorService.class);
        ScheduledExecutorService scheduledExecutorService2 = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect(Executors.newSingleThreadScheduledExecutor((ThreadFactory) EasyMock.anyObject(ThreadFactory.class))).andReturn(scheduledExecutorService);
        EasyMock.expect(Executors.newSingleThreadScheduledExecutor((ThreadFactory) EasyMock.anyObject(ThreadFactory.class))).andReturn(scheduledExecutorService2);
        EasyMock.expect(scheduledExecutorService2.scheduleAtFixedRate((Runnable) EasyMock.anyObject(RocksDBMetricsRecordingTrigger.class), EasyMock.eq(0L), EasyMock.eq(1L), (TimeUnit) EasyMock.eq(TimeUnit.MINUTES))).andReturn((Object) null);
        EasyMock.expect(scheduledExecutorService2.shutdownNow()).andReturn((Object) null);
        PowerMock.replay(new Object[]{Executors.class});
        PowerMock.replay(new Object[]{scheduledExecutorService2});
        PowerMock.replay(new Object[]{scheduledExecutorService});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", Materialized.as("store"));
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name());
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                PowerMock.verify(new Object[]{Executors.class});
                PowerMock.verify(new Object[]{scheduledExecutorService2});
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.niceMock(ScheduledExecutorService.class);
        ScheduledExecutorService scheduledExecutorService2 = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect(Executors.newSingleThreadScheduledExecutor((ThreadFactory) EasyMock.anyObject(ThreadFactory.class))).andReturn(scheduledExecutorService);
        PowerMock.replay(new Object[]{Executors.class, scheduledExecutorService2, scheduledExecutorService});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", Materialized.as("store"));
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.name());
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                PowerMock.verify(new Object[]{Executors.class, scheduledExecutorService2});
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
        this.props.setProperty("rocksdb.config.setter", TestRocksDbConfigSetter.class.getName());
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
        LogCaptureAppender.unregister(createAndRegister);
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("stream-client [test-client] RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` (or `org.rocksdb.ColumnFamilyOptions`)."));
    }

    @Test
    public void shouldCleanupOldStateDirs() throws Exception {
        PowerMock.mockStatic(Executors.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        EasyMock.expect(Executors.newSingleThreadScheduledExecutor((ThreadFactory) EasyMock.anyObject(ThreadFactory.class))).andReturn(scheduledExecutorService).anyTimes();
        EasyMock.expect(scheduledExecutorService.scheduleAtFixedRate((Runnable) EasyMock.anyObject(Runnable.class), EasyMock.eq(1L), EasyMock.eq(1L), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        EasyMock.expect(scheduledExecutorService.shutdownNow()).andReturn((Object) null);
        PowerMock.expectNew(StateDirectory.class, new Object[]{EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Time.class), Boolean.valueOf(EasyMock.eq(true))}).andReturn(this.stateDirectory);
        PowerMock.replayAll(new Object[]{Executors.class, scheduledExecutorService, this.stateDirectory});
        this.props.setProperty("state.cleanup.delay.ms", "1");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", Materialized.as("store"));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                PowerMock.verify(new Object[]{Executors.class, scheduledExecutorService});
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
        String str = this.testName.getMethodName() + "-input";
        String str2 = this.testName.getMethodName() + "-output";
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
                public void process(String str3, String str4) {
                    if (str4.length() % KafkaStreamsTest.NUM_THREADS == 0) {
                        context().forward(str3, str3 + str4);
                    }
                }
            };
        }, new String[]{"source"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
        startStreamsAndCheckDirExists(getStatefulTopology(this.testName.getMethodName() + "-input", this.testName.getMethodName() + "-output", this.testName.getMethodName() + "-global", this.testName.getMethodName() + "-counts", this.testName.getMethodName() + "-globalStore", false), false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
        startStreamsAndCheckDirExists(getStatefulTopology(this.testName.getMethodName() + "-input", this.testName.getMethodName() + "-output", this.testName.getMethodName() + "-global", this.testName.getMethodName() + "-counts", this.testName.getMethodName() + "-globalStore", true), true);
    }

    private Topology getStatefulTopology(String str, String str2, String str3, String str4, String str5, boolean z) {
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str4) : Stores.inMemoryKeyValueStore(str4), Serdes.String(), Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
                public void process(String str6, String str7) {
                    context().getStateStore(str4).put(str6, 5L);
                    context().forward(str6, "5");
                    context().commit();
                }
            };
        }, new String[]{"source"}).addStateStore(keyValueStoreBuilder, new String[]{"process"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str5) : Stores.inMemoryKeyValueStore(str5), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", Serdes.String().deserializer(), Serdes.String().deserializer(), str3, str3 + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private void startStreamsAndCheckDirExists(Topology topology, boolean z) throws Exception {
        PowerMock.expectNew(StateDirectory.class, new Object[]{EasyMock.anyObject(StreamsConfig.class), EasyMock.anyObject(Time.class), Boolean.valueOf(EasyMock.eq(z))}).andReturn(this.stateDirectory);
        PowerMock.replayAll(new Object[0]);
        new KafkaStreams(topology, this.props, this.supplier, this.time);
        PowerMock.verifyAll();
    }
}
