package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/LagFetchIntegrationTest.class */
public class LagFetchIntegrationTest {
    private static final long WAIT_TIMEOUT_MS = 120000;
    private Properties streamsConfiguration;
    private Properties consumerConfiguration;
    private String inputTopicName;
    private String outputTopicName;
    private String stateStoreName;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Logger LOG = LoggerFactory.getLogger(LagFetchIntegrationTest.class);
    private final MockTime mockTime = CLUSTER.time;

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.inputTopicName = "input-topic-" + safeUniqueTestName;
        this.outputTopicName = "output-topic-" + safeUniqueTestName;
        this.stateStoreName = "lagfetch-test-store" + safeUniqueTestName;
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + safeUniqueTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", 100L);
        this.consumerConfiguration = new Properties();
        this.consumerConfiguration.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.consumerConfiguration.setProperty("group.id", "group-" + safeUniqueTestName);
        this.consumerConfiguration.setProperty("auto.offset.reset", "earliest");
        this.consumerConfiguration.setProperty("key.deserializer", StringDeserializer.class.getName());
        this.consumerConfiguration.setProperty("value.deserializer", LongDeserializer.class.getName());
    }

    @After
    public void shutdown() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Map<Integer, LagInfo>> getFirstNonEmptyLagMap(KafkaStreams kafkaStreams) throws InterruptedException {
        HashMap hashMap = new HashMap();
        TestUtils.waitForCondition(() -> {
            Map allLocalStorePartitionLags = kafkaStreams.allLocalStorePartitionLags();
            if (allLocalStorePartitionLags.size() > 0) {
                hashMap.putAll(allLocalStorePartitionLags);
            }
            return allLocalStorePartitionLags.size() > 0;
        }, WAIT_TIMEOUT_MS, "Should obtain non-empty lag information eventually");
        return hashMap;
    }

    private void shouldFetchLagsDuringRebalancing(String str) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        ArrayList arrayList = new ArrayList();
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet(new KeyValue[]{new KeyValue("k1", 1L), new KeyValue("k2", 2L), new KeyValue("k3", 3L), new KeyValue("k4", 4L), new KeyValue("k5", 5L)}), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
        for (int i = 0; i < 2; i++) {
            Properties properties = (Properties) this.streamsConfiguration.clone();
            properties.put("internal.task.assignor.class", FallbackPriorTaskAssignor.class.getName());
            properties.put("application.server", "localhost:" + i);
            properties.put("client.id", "instance-" + i);
            properties.put("topology.optimization", str);
            properties.put("num.standby.replicas", 1);
            properties.put("state.dir", TestUtils.tempDirectory(this.stateStoreName + i).getAbsolutePath());
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.table(this.inputTopicName, Materialized.as(this.stateStoreName)).toStream().to(this.outputTopicName);
            arrayList.add(new KafkaStreamsWrapper(streamsBuilder.build(properties), properties));
        }
        KafkaStreamsWrapper kafkaStreamsWrapper = (KafkaStreamsWrapper) arrayList.get(0);
        KafkaStreamsWrapper kafkaStreamsWrapper2 = (KafkaStreamsWrapper) arrayList.get(1);
        kafkaStreamsWrapper.setStreamThreadStateListener((thread, threadStateTransitionValidator, threadStateTransitionValidator2) -> {
            if (threadStateTransitionValidator == StreamThread.State.RUNNING) {
                countDownLatch.countDown();
            }
        });
        kafkaStreamsWrapper2.setStreamThreadStateListener((thread2, threadStateTransitionValidator3, threadStateTransitionValidator4) -> {
            if (threadStateTransitionValidator4 != StreamThread.State.PARTITIONS_ASSIGNED || threadStateTransitionValidator3 != StreamThread.State.RUNNING) {
                if (threadStateTransitionValidator3 == StreamThread.State.RUNNING) {
                    countDownLatch2.countDown();
                }
            } else {
                countDownLatch3.countDown();
                try {
                    cyclicBarrier.await(60L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        try {
            TestUtils.waitForCondition(() -> {
                return kafkaStreamsWrapper.allLocalStorePartitionLags().size() == 0;
            }, WAIT_TIMEOUT_MS, "Should see empty lag map before streams is started.");
            kafkaStreamsWrapper.start();
            countDownLatch.await(60L, TimeUnit.SECONDS);
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, WAIT_TIMEOUT_MS);
            Map<String, Map<Integer, LagInfo>> firstNonEmptyLagMap = getFirstNonEmptyLagMap(kafkaStreamsWrapper);
            MatcherAssert.assertThat(Integer.valueOf(firstNonEmptyLagMap.size()), IsEqual.equalTo(1));
            MatcherAssert.assertThat(firstNonEmptyLagMap.keySet(), IsEqual.equalTo(Utils.mkSet(new String[]{this.stateStoreName})));
            MatcherAssert.assertThat(Integer.valueOf(firstNonEmptyLagMap.get(this.stateStoreName).size()), IsEqual.equalTo(1));
            LagInfo lagInfo = firstNonEmptyLagMap.get(this.stateStoreName).get(0);
            MatcherAssert.assertThat(Long.valueOf(lagInfo.currentOffsetPosition()), IsEqual.equalTo(5L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo.endOffsetPosition()), IsEqual.equalTo(5L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo.offsetLag()), IsEqual.equalTo(0L));
            kafkaStreamsWrapper2.start();
            countDownLatch3.await(60L, TimeUnit.SECONDS);
            Map<String, Map<Integer, LagInfo>> firstNonEmptyLagMap2 = getFirstNonEmptyLagMap(kafkaStreamsWrapper2);
            MatcherAssert.assertThat(Integer.valueOf(firstNonEmptyLagMap2.size()), IsEqual.equalTo(1));
            MatcherAssert.assertThat(firstNonEmptyLagMap2.keySet(), IsEqual.equalTo(Utils.mkSet(new String[]{this.stateStoreName})));
            MatcherAssert.assertThat(Integer.valueOf(firstNonEmptyLagMap2.get(this.stateStoreName).size()), IsEqual.equalTo(1));
            LagInfo lagInfo2 = firstNonEmptyLagMap2.get(this.stateStoreName).get(0);
            MatcherAssert.assertThat(Long.valueOf(lagInfo2.currentOffsetPosition()), IsEqual.equalTo(0L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo2.endOffsetPosition()), IsEqual.equalTo(5L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo2.offsetLag()), IsEqual.equalTo(5L));
            cyclicBarrier.await(60L, TimeUnit.SECONDS);
            TestUtils.waitForCondition(() -> {
                return ((LagInfo) ((Map) kafkaStreamsWrapper2.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0;
            }, WAIT_TIMEOUT_MS, "Standby should eventually catchup and have zero lag.");
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((KafkaStreams) it.next()).close();
            }
        } catch (Throwable th) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((KafkaStreams) it2.next()).close();
            }
            throw th;
        }
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithOptimization() throws Exception {
        shouldFetchLagsDuringRebalancing("all");
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithNoOptimization() throws Exception {
        shouldFetchLagsDuringRebalancing("none");
    }

    @Test
    public void shouldFetchLagsDuringRestoration() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet(new KeyValue[]{new KeyValue("k1", 1L), new KeyValue("k2", 2L), new KeyValue("k3", 3L), new KeyValue("k4", 4L), new KeyValue("k5", 5L)}), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
        Properties properties = (Properties) this.streamsConfiguration.clone();
        File tempDirectory = TestUtils.tempDirectory(this.stateStoreName + "0");
        properties.put("application.server", "localhost:0");
        properties.put("client.id", "instance-0");
        properties.put("state.dir", tempDirectory.getAbsolutePath());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(this.inputTopicName, Materialized.as(this.stateStoreName)).toStream().to(this.outputTopicName);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        try {
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.allLocalStorePartitionLags().size() == 0;
            }, WAIT_TIMEOUT_MS, "Should see empty lag map before streams is started.");
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60L));
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, WAIT_TIMEOUT_MS);
            AtomicReference atomicReference = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                Map allLocalStorePartitionLags = kafkaStreams.allLocalStorePartitionLags();
                MatcherAssert.assertThat(Integer.valueOf(allLocalStorePartitionLags.size()), IsEqual.equalTo(1));
                MatcherAssert.assertThat(allLocalStorePartitionLags.keySet(), IsEqual.equalTo(Utils.mkSet(new String[]{this.stateStoreName})));
                MatcherAssert.assertThat(Integer.valueOf(((Map) allLocalStorePartitionLags.get(this.stateStoreName)).size()), IsEqual.equalTo(1));
                LagInfo lagInfo = (LagInfo) ((Map) allLocalStorePartitionLags.get(this.stateStoreName)).get(0);
                MatcherAssert.assertThat(Long.valueOf(lagInfo.currentOffsetPosition()), IsEqual.equalTo(5L));
                MatcherAssert.assertThat(Long.valueOf(lagInfo.endOffsetPosition()), IsEqual.equalTo(5L));
                MatcherAssert.assertThat(Long.valueOf(lagInfo.offsetLag()), IsEqual.equalTo(0L));
                atomicReference.set(lagInfo);
                return true;
            }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
            MatcherAssert.assertThat("Streams instance did not close within timeout", kafkaStreams.close(Duration.ofSeconds(60L)));
            IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
            Files.walk(tempDirectory.toPath(), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).forEach(file -> {
                Assert.assertTrue("Some state " + file + " could not be deleted", file.delete());
            });
            final KafkaStreams kafkaStreams2 = new KafkaStreams(streamsBuilder.build(), properties);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final HashMap hashMap = new HashMap();
            final HashMap hashMap2 = new HashMap();
            kafkaStreams2.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.LagFetchIntegrationTest.1
                public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
                    try {
                        hashMap.putAll(LagFetchIntegrationTest.this.getFirstNonEmptyLagMap(kafkaStreams2));
                    } catch (Exception e) {
                        LagFetchIntegrationTest.LOG.error("Exception while trying to obtain lag map", e);
                    }
                }

                public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
                }

                public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
                    try {
                        hashMap2.putAll(LagFetchIntegrationTest.this.getFirstNonEmptyLagMap(kafkaStreams2));
                    } catch (Exception e) {
                        LagFetchIntegrationTest.LOG.error("Exception while trying to obtain lag map", e);
                    }
                    countDownLatch.countDown();
                }
            });
            kafkaStreams2.start();
            countDownLatch.await(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            TestUtils.waitForCondition(() -> {
                return ((LagInfo) ((Map) kafkaStreams2.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0;
            }, WAIT_TIMEOUT_MS, "Standby should eventually catchup and have zero lag.");
            LagInfo lagInfo = (LagInfo) ((Map) hashMap.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat(Long.valueOf(lagInfo.currentOffsetPosition()), IsEqual.equalTo(0L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo.endOffsetPosition()), IsEqual.equalTo(5L));
            MatcherAssert.assertThat(Long.valueOf(lagInfo.offsetLag()), IsEqual.equalTo(5L));
            MatcherAssert.assertThat(((Map) hashMap2.get(this.stateStoreName)).get(0), IsEqual.equalTo(atomicReference.get()));
            kafkaStreams.close();
            kafkaStreams.cleanUp();
        } catch (Throwable th) {
            kafkaStreams.close();
            kafkaStreams.cleanUp();
            throw th;
        }
    }
}
