package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/RebalanceIntegrationTest.class */
public class RebalanceIntegrationTest {
    private static final int MAX_POLL_INTERVAL_MS = 30000;
    private String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final Logger LOG = LoggerFactory.getLogger(RebalanceIntegrationTest.class);
    private static final int NUM_BROKERS = 3;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("auto.create.topics.enable", "true"), Utils.mkEntry("transaction.max.timeout.ms", "2147483647")})));
    private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics() throws Exception {
        this.applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
        CLUSTER.deleteTopics(MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> list, List<KeyValue<Long, Long>> list2) {
        HashSet hashSet = new HashSet();
        addAllKeys(hashSet, list);
        addAllKeys(hashSet, list2);
        for (Long l : hashSet) {
            MatcherAssert.assertThat("The records do not match what expected", getAllRecordPerKey(l, list), CoreMatchers.equalTo(getAllRecordPerKey(l, list2)));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addAllKeys(Set<Long> set, List<KeyValue<Long, Long>> list) {
        Iterator<KeyValue<Long, Long>> it = list.iterator();
        while (it.hasNext()) {
            set.add(it.next().key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long l, List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KeyValue<Long, Long> keyValue : list) {
            if (((Long) keyValue.key).equals(l)) {
                arrayList.add(keyValue);
            }
        }
        return arrayList;
    }

    @Test
    public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(MULTI_PARTITION_INPUT_TOPIC).process(() -> {
            return new Processor<Long, Long, Long, Long>() { // from class: org.apache.kafka.streams.integration.RebalanceIntegrationTest.1
                ProcessorContext context;

                public void init(ProcessorContext<Long, Long> processorContext) {
                    this.context = processorContext;
                    AtomicReference atomicReference = new AtomicReference();
                    atomicReference.set(processorContext.schedule(Duration.ofSeconds(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                        processorContext.forward(new Record(Long.valueOf((processorContext.taskId().partition() + 1) * 100), Long.valueOf(-(processorContext.taskId().partition() + 1)), processorContext.currentSystemTimeMs()));
                        ((Cancellable) atomicReference.get()).cancel();
                    }));
                }

                public void process(Record<Long, Long> record) {
                    this.context.forward(record.withValue(Long.valueOf(((RecordMetadata) this.context.recordMetadata().get()).offset())));
                    if (atomicBoolean.get()) {
                        this.context.commit();
                    }
                }
            };
        }, new String[0]).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", Integer.MAX_VALUE);
        properties.put(StreamsConfig.consumerPrefix("max.poll.records"), 1);
        properties.put(StreamsConfig.consumerPrefix("metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 29999);
        properties.put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), Integer.valueOf(MAX_POLL_INTERVAL_MS));
        properties.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), Integer.MAX_VALUE);
        properties.put("task.assignor.class", TestTaskAssignor.class.getName());
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties));
        Throwable th = null;
        try {
            try {
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                List<KeyValue<Long, Long>> asList = Arrays.asList(KeyValue.pair(100L, -1L), KeyValue.pair(200L, -2L));
                checkResultPerKey(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, asList.size()), asList);
                kafkaStreams.addStreamThread();
                List<KeyValue<Long, Long>> asList2 = Arrays.asList(KeyValue.pair(100L, -1L), KeyValue.pair(200L, -2L));
                checkResultPerKey(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, asList2.size()), asList2);
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }
}
