package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.class */
public class StandbyTaskEOSMultiRebalanceIntegrationTest {
    private String appId;
    private String inputTopic;
    private String storeName;
    private String counterName;
    private String outputTopic;
    private KafkaStreams streamInstanceOne;
    private KafkaStreams streamInstanceTwo;
    private KafkaStreams streamInstanceThree;
    private final int partitionCount = 12;
    private static final Logger LOG = LoggerFactory.getLogger(StandbyTaskEOSMultiRebalanceIntegrationTest.class);
    private static final long TWO_MINUTE_TIMEOUT = Duration.ofMinutes(2).toMillis();
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics() throws Exception {
        String uuid = UUID.randomUUID().toString();
        this.appId = "app-" + uuid;
        this.inputTopic = "input-" + uuid;
        this.outputTopic = "output-" + uuid;
        this.storeName = "store-" + uuid;
        this.counterName = "counter-" + uuid;
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic);
        CLUSTER.createTopic(this.inputTopic, 12, 3);
        CLUSTER.createTopic(this.outputTopic, 12, 3);
    }

    @AfterEach
    public void cleanUp() {
        if (this.streamInstanceOne != null) {
            this.streamInstanceOne.close();
        }
        if (this.streamInstanceTwo != null) {
            this.streamInstanceTwo.close();
        }
        if (this.streamInstanceThree != null) {
            this.streamInstanceThree.close();
        }
    }

    @Test
    public void shouldHonorEOSWhenUsingCachingAndStandbyReplicas() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("isolation.level", "read_committed");
        long currentTimeMillis = System.currentTimeMillis();
        String path = TestUtils.tempDirectory(this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, (Collection) IntStream.range(0, 3000).boxed().map(num -> {
            return new KeyValue(num, num);
        }).collect(Collectors.toList()), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), Long.valueOf(10 + currentTimeMillis));
        this.streamInstanceOne = buildWithUniqueIdAssignmentTopology(path + "-1");
        this.streamInstanceTwo = buildWithUniqueIdAssignmentTopology(path + "-2");
        this.streamInstanceThree = buildWithUniqueIdAssignmentTopology(path + "-3");
        LOG.info("start first instance and wait for completed processing");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceOne), Duration.ofSeconds(30L));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), UUID.randomUUID().toString(), IntegerDeserializer.class, IntegerDeserializer.class, properties), this.outputTopic, 3000);
        LOG.info("Finished reading the initial bulk");
        LOG.info("start second instance and wait for standby replication");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceTwo), Duration.ofSeconds(30L));
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) this.streamInstanceTwo.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()).enableStaleStores())).get(0) != null;
        }, TWO_MINUTE_TIMEOUT, "Could not get key from standby store");
        LOG.info("Second stream have some data in the state store");
        LOG.info("Produce the second bulk");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, (Collection) IntStream.range(3000, 63000).boxed().map(num2 -> {
            return new KeyValue(num2, num2);
        }).collect(Collectors.toList()), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), Long.valueOf(1000 + currentTimeMillis));
        LOG.info("Start stream three which will introduce a re-balancing event and hopefully some redistribution of tasks.");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceThree), Duration.ofSeconds(90L));
        LOG.info("Wait for the processing to be completed");
        List waitUntilMinRecordsReceived = IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), UUID.randomUUID().toString(), IntegerDeserializer.class, IntegerDeserializer.class, properties), this.outputTopic, 63000, Duration.ofMinutes(10L).toMillis());
        LOG.info("Processing completed");
        ((Map) waitUntilMinRecordsReceived.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.value();
        }))).forEach(this::logIfDuplicate);
        MatcherAssert.assertThat("Each output should correspond to one distinct value", Long.valueOf(waitUntilMinRecordsReceived.stream().map((v0) -> {
            return v0.value();
        }).distinct().count()), Matchers.is(Matchers.equalTo(Long.valueOf(waitUntilMinRecordsReceived.size()))));
    }

    private void logIfDuplicate(Integer num, List<ConsumerRecord<Integer, Integer>> list) {
        MatcherAssert.assertThat("The id and the value in the records must match", list.stream().allMatch(consumerRecord -> {
            return num.equals(consumerRecord.value());
        }));
        if (list.size() > 1) {
            LOG.warn("Id : " + num + " is assigned to the following " + list.stream().map((v0) -> {
                return v0.key();
            }).collect(Collectors.toList()));
        }
    }

    private KafkaStreams buildWithUniqueIdAssignmentTopology(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.storeName), Serdes.Integer(), Serdes.Integer()));
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.counterName), Serdes.Integer(), Serdes.Integer()).withCachingEnabled());
        streamsBuilder.stream(this.inputTopic).process(() -> {
            return new Processor<Integer, Integer, Integer, Integer>() { // from class: org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest.1
                private KeyValueStore store;
                private KeyValueStore counter;
                private ProcessorContext context;

                public void init(ProcessorContext<Integer, Integer> processorContext) {
                    this.context = processorContext;
                    this.store = processorContext.getStateStore(StandbyTaskEOSMultiRebalanceIntegrationTest.this.storeName);
                    this.counter = processorContext.getStateStore(StandbyTaskEOSMultiRebalanceIntegrationTest.this.counterName);
                }

                public void process(Record<Integer, Integer> record) {
                    Integer num = (Integer) record.key();
                    MatcherAssert.assertThat("Key and value mus be equal", num.equals((Integer) record.value()));
                    Integer num2 = (Integer) this.store.get(num);
                    if (num2 == null) {
                        Integer num3 = (Integer) this.counter.get(0);
                        int intValue = num3 == null ? 0 : num3.intValue() + 1;
                        this.counter.put(0, Integer.valueOf(intValue));
                        num2 = Integer.valueOf((intValue * 12) + ((RecordMetadata) this.context.recordMetadata().get()).partition());
                        this.store.put(num, num2);
                    }
                    this.context.forward(record.withKey(num2));
                }

                public void close() {
                }
            };
        }, new String[]{this.storeName, this.counterName}).to(this.outputTopic);
        return new KafkaStreams(streamsBuilder.build(), props(str));
    }

    private Properties props(String str) {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", str);
        properties.put("num.standby.replicas", 1);
        properties.put("num.stream.threads", 4);
        properties.put("max.warmup.replicas", 1);
        properties.put("processing.guarantee", "exactly_once_v2");
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("commit.interval.ms", 100L);
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }
}
