package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.class */
public class RackAwarenessIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String TAG_VALUE_K8_CLUSTER_1 = "k8s-cluster-1";
    private static final String TAG_VALUE_K8_CLUSTER_2 = "k8s-cluster-2";
    private static final String TAG_VALUE_K8_CLUSTER_3 = "k8s-cluster-3";
    private static final String TAG_VALUE_EU_CENTRAL_1A = "eu-central-1a";
    private static final String TAG_VALUE_EU_CENTRAL_1B = "eu-central-1b";
    private static final String TAG_VALUE_EU_CENTRAL_1C = "eu-central-1c";
    private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1;
    private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2;
    private static final String INPUT_TOPIC = "input-topic";
    private static final String TAG_ZONE = "zone";
    private static final String TAG_CLUSTER = "cluster";
    private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances;
    private Properties baseConfiguration;
    private Topology topology;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/RackAwarenessIntegrationTest$KafkaStreamsWithConfiguration.class */
    public static final class KafkaStreamsWithConfiguration {
        private final Properties configuration;
        private final KafkaStreams kafkaStreams;

        KafkaStreamsWithConfiguration(Properties properties, KafkaStreams kafkaStreams) {
            this.configuration = properties;
            this.kafkaStreams = kafkaStreams;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/RackAwarenessIntegrationTest$TaskClientTagDistribution.class */
    public static final class TaskClientTagDistribution {
        private final TaskClientTags activeTaskClientTags;
        private final List<TaskClientTags> standbyTasksClientTags;

        TaskClientTagDistribution(TaskClientTags taskClientTags, List<TaskClientTags> list) {
            this.activeTaskClientTags = taskClientTags;
            this.standbyTasksClientTags = list;
        }

        public String toString() {
            return "TaskDistribution{activeTaskClientTagsView=" + this.activeTaskClientTags + ", standbyTasks=" + this.standbyTasksClientTags + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/RackAwarenessIntegrationTest$TaskClientTags.class */
    public static final class TaskClientTags {
        private final TaskId taskId;
        private final Map<String, String> clientTags;

        TaskClientTags(TaskId taskId, Map<String, String> map) {
            this.taskId = taskId;
            this.clientTags = map;
        }

        public String toString() {
            return "TaskClientTags{taskId=" + this.taskId + ", clientTags=" + this.clientTags + '}';
        }
    }

    @BeforeClass
    public static void createTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 6, 1);
    }

    @Before
    public void setup() {
        this.kafkaStreamsInstances = new ArrayList();
        this.baseConfiguration = new Properties();
        this.baseConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.baseConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.baseConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.baseConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.baseConfiguration.put("default.value.serde", Serdes.Integer().getClass());
    }

    @After
    public void cleanup() throws IOException {
        for (KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : this.kafkaStreamsInstances) {
            kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
            IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration);
        }
        this.kafkaStreamsInstances.clear();
    }

    @Test
    public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception {
        initTopology(3, 3);
        ArrayList arrayList = new ArrayList();
        Map<String, String> hashMap = new HashMap<>();
        Map<String, String> hashMap2 = new HashMap<>();
        for (int i = 0; i < 5; i++) {
            arrayList.add("key-" + i);
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            String str = (String) arrayList.get(i2);
            hashMap.put(str, "value-1-" + i2);
            hashMap2.put(str, "value-2-" + i2);
        }
        Assert.assertEquals(5L, arrayList.size());
        Stream.of((Object[]) new Map[]{hashMap, hashMap2}).forEach(map -> {
            Assert.assertEquals(String.format("clientsTags with content '%s' did not match expected size", map), 5L, map.size());
        });
        createAndStart(hashMap, arrayList, 1);
        createAndStart(hashMap, arrayList, 1);
        createAndStart(hashMap2, arrayList, 1);
        waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue(isIdealTaskDistributionReachedForTags(arrayList));
        stopKafkaStreamsInstanceWithIndex(0);
        waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue(isIdealTaskDistributionReachedForTags(arrayList));
    }

    @Test
    public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameClusterTag() throws Exception {
        initTopology();
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue(isIdealTaskDistributionReachedForTags(Collections.singletonList(TAG_ZONE)));
    }

    @Test
    public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception {
        initTopology();
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue(isIdealTaskDistributionReachedForTags(Arrays.asList(TAG_ZONE, TAG_CLUSTER)));
    }

    @Test
    public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved() throws Exception {
        initTopology();
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES);
        waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue(isIdealTaskDistributionReachedForTags(Collections.singletonList(TAG_ZONE)));
        Assert.assertTrue(isPartialTaskDistributionReachedForTags(Collections.singletonList(TAG_CLUSTER)));
    }

    private void stopKafkaStreamsInstanceWithIndex(int i) {
        this.kafkaStreamsInstances.get(i).kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
        this.kafkaStreamsInstances.remove(i);
    }

    private void waitUntilAllKafkaStreamsClientsAreRunning() throws Exception {
        waitUntilAllKafkaStreamsClientsAreRunning(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
    }

    private void waitUntilAllKafkaStreamsClientsAreRunning(Duration duration) throws Exception {
        IntegrationTestUtils.waitForApplicationState((List) this.kafkaStreamsInstances.stream().map(kafkaStreamsWithConfiguration -> {
            return kafkaStreamsWithConfiguration.kafkaStreams;
        }).collect(Collectors.toList()), KafkaStreams.State.RUNNING, duration);
    }

    private boolean isPartialTaskDistributionReachedForTags(Collection<String> collection) {
        return isTaskDistributionTestSuccessful(taskClientTagDistribution -> {
            return tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(taskClientTagDistribution.standbyTasksClientTags, taskClientTagDistribution.activeTaskClientTags.clientTags, collection);
        });
    }

    private boolean isIdealTaskDistributionReachedForTags(Collection<String> collection) {
        return isTaskDistributionTestSuccessful(taskClientTagDistribution -> {
            return tagsAmongstStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags, collection) && tagsAmongstActiveAndAllStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags, taskClientTagDistribution.activeTaskClientTags.clientTags, collection);
        });
    }

    private boolean isTaskDistributionTestSuccessful(Predicate<TaskClientTagDistribution> predicate) {
        List<TaskClientTagDistribution> tasksClientTagDistributions = getTasksClientTagDistributions();
        if (tasksClientTagDistributions.isEmpty()) {
            return false;
        }
        return tasksClientTagDistributions.stream().allMatch(predicate);
    }

    private static boolean tagsAmongstActiveAndAllStandbyTasksAreDifferent(Collection<TaskClientTags> collection, Map<String, String> map, Collection<String> collection2) {
        return collection.stream().allMatch(taskClientTags -> {
            return collection2.stream().noneMatch(str -> {
                return ((String) map.get(str)).equals(taskClientTags.clientTags.get(str));
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(Collection<TaskClientTags> collection, Map<String, String> map, Collection<String> collection2) {
        return collection.stream().anyMatch(taskClientTags -> {
            return collection2.stream().noneMatch(str -> {
                return ((String) map.get(str)).equals(taskClientTags.clientTags.get(str));
            });
        });
    }

    private static boolean tagsAmongstStandbyTasksAreDifferent(Collection<TaskClientTags> collection, Collection<String> collection2) {
        HashMap hashMap = new HashMap();
        for (TaskClientTags taskClientTags : collection) {
            Iterator<String> it = collection2.iterator();
            while (it.hasNext()) {
                String str = (String) taskClientTags.clientTags.get(it.next());
                hashMap.put(str, Integer.valueOf(((Integer) hashMap.getOrDefault(str, 0)).intValue() + 1));
            }
        }
        return hashMap.values().stream().noneMatch(num -> {
            return num.intValue() > 1;
        });
    }

    private void initTopology() {
        initTopology(DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES, 1);
    }

    private void initTopology(int i, int i2) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.Integer(), Serdes.Integer()));
        KStream stream = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()));
        stream.repartition(Repartitioned.numberOfPartitions(i)).filter((num, num2) -> {
            return true;
        });
        for (int i3 = 0; i3 < i2; i3++) {
            stream.repartition(Repartitioned.numberOfPartitions(i)).groupByKey().reduce((v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        }
        this.topology = streamsBuilder.build();
    }

    private List<TaskClientTagDistribution> getTasksClientTagDistributions() {
        ArrayList arrayList = new ArrayList();
        for (KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : this.kafkaStreamsInstances) {
            StreamsConfig streamsConfig = new StreamsConfig(kafkaStreamsWithConfiguration.configuration);
            Iterator it = kafkaStreamsWithConfiguration.kafkaStreams.metadataForLocalThreads().iterator();
            while (it.hasNext()) {
                ((ThreadMetadata) it.next()).activeTasks().forEach(taskMetadata -> {
                    TaskId taskId = taskMetadata.taskId();
                    Map clientTags = streamsConfig.getClientTags();
                    List<TaskClientTags> findStandbysForActiveTask = findStandbysForActiveTask(taskId);
                    if (findStandbysForActiveTask.isEmpty()) {
                        return;
                    }
                    arrayList.add(new TaskClientTagDistribution(new TaskClientTags(taskId, clientTags), findStandbysForActiveTask));
                });
            }
        }
        return arrayList;
    }

    private List<TaskClientTags> findStandbysForActiveTask(TaskId taskId) {
        ArrayList arrayList = new ArrayList();
        for (KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : this.kafkaStreamsInstances) {
            Iterator it = kafkaStreamsWithConfiguration.kafkaStreams.metadataForLocalThreads().iterator();
            while (it.hasNext()) {
                ((ThreadMetadata) it.next()).standbyTasks().forEach(taskMetadata -> {
                    TaskId taskId2 = taskMetadata.taskId();
                    if (taskId.equals(taskId2)) {
                        arrayList.add(new TaskClientTags(taskId2, new StreamsConfig(kafkaStreamsWithConfiguration.configuration).getClientTags()));
                    }
                });
            }
        }
        return arrayList;
    }

    private static Map<String, String> buildClientTags(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(TAG_ZONE, str);
        hashMap.put(TAG_CLUSTER, str2);
        return hashMap;
    }

    private void createAndStart(Map<String, String> map, Collection<String> collection, int i) {
        Properties createStreamsConfiguration = createStreamsConfiguration(map, collection, i);
        KafkaStreams kafkaStreams = new KafkaStreams(this.topology, createStreamsConfiguration);
        this.kafkaStreamsInstances.add(new KafkaStreamsWithConfiguration(createStreamsConfiguration, kafkaStreams));
        kafkaStreams.start();
    }

    private Properties createStreamsConfiguration(Map<String, String> map, Collection<String> collection, int i) {
        Properties properties = new Properties();
        properties.putAll(this.baseConfiguration);
        properties.put("num.standby.replicas", Integer.valueOf(i));
        properties.put("rack.aware.assignment.tags", String.join(",", collection));
        map.forEach((str, str2) -> {
            properties.put(StreamsConfig.clientTagPrefix(str), str2);
        });
        properties.put("state.dir", TestUtils.tempDirectory(String.join("-", map.values())).getPath());
        return properties;
    }
}
