package org.apache.kafka.connect.mirror.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMaker;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.class */
public class DedicatedMirrorIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
    private static final int TOPIC_CREATION_TIMEOUT_MS = 30000;
    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30000;
    private static final long MM_START_UP_TIMEOUT_MS = 120000;
    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
    private Map<String, MirrorMaker> mirrorMakers;

    @BeforeEach
    public void setup() {
        this.kafkaClusters = new HashMap();
        this.mirrorMakers = new HashMap();
    }

    @AfterEach
    public void teardown() throws Throwable {
        AtomicReference atomicReference = new AtomicReference();
        this.mirrorMakers.forEach((str, mirrorMaker) -> {
            mirrorMaker.getClass();
            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + str + "'", atomicReference);
        });
        this.mirrorMakers.forEach((str2, mirrorMaker2) -> {
            mirrorMaker2.awaitStop();
        });
        this.kafkaClusters.forEach((str3, embeddedKafkaCluster) -> {
            embeddedKafkaCluster.getClass();
            Utils.closeQuietly(embeddedKafkaCluster::stop, "Embedded Kafka cluster '" + str3 + "'", atomicReference);
        });
        if (atomicReference.get() != null) {
            throw ((Throwable) atomicReference.get());
        }
    }

    private EmbeddedKafkaCluster startKafkaCluster(String str, int i, Properties properties) {
        if (this.kafkaClusters.containsKey(str)) {
            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
        }
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(i, properties);
        this.kafkaClusters.put(str, embeddedKafkaCluster);
        embeddedKafkaCluster.start();
        return embeddedKafkaCluster;
    }

    private MirrorMaker startMirrorMaker(String str, Map<String, String> map) {
        if (this.mirrorMakers.containsKey(str)) {
            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
        }
        MirrorMaker mirrorMaker = new MirrorMaker(map);
        this.mirrorMakers.put(str, mirrorMaker);
        mirrorMaker.start();
        return mirrorMaker;
    }

    private void stopMirrorMaker(String str) {
        MirrorMaker remove = this.mirrorMakers.remove(str);
        if (remove == null) {
            throw new IllegalStateException("No MirrorMaker named " + str + " has been started");
        }
        remove.stop();
    }

    @Test
    public void testSingleNodeCluster() throws Exception {
        Properties properties = new Properties();
        final EmbeddedKafkaCluster startKafkaCluster = startKafkaCluster("A", 1, properties);
        final EmbeddedKafkaCluster startKafkaCluster2 = startKafkaCluster("B", 1, properties);
        Admin createAdminClient = startKafkaCluster2.createAdminClient();
        Throwable th = null;
        try {
            try {
                MirrorMaker startMirrorMaker = startMirrorMaker("single node", new HashMap<String, String>() { // from class: org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.1
                    {
                        put("dedicated.mode.enable.internal.rest", "false");
                        put("listeners", "http://localhost:0");
                        put("refresh.topics.interval.seconds", "1");
                        put("clusters", String.join(", ", "A", "B"));
                        put("A.bootstrap.servers", startKafkaCluster.bootstrapServers());
                        put("B.bootstrap.servers", startKafkaCluster2.bootstrapServers());
                        put("A->B.enabled", "true");
                        put("A->B.topics", "^test-topic-.*");
                        put("B->A.enabled", "false");
                        put("B->A.emit.heartbeats.enabled", "false");
                        put("replication.factor", "1");
                        put("checkpoints.topic.replication.factor", "1");
                        put("heartbeats.topic.replication.factor", "1");
                        put("offset-syncs.topic.replication.factor", "1");
                        put("offset.storage.replication.factor", "1");
                        put("status.storage.replication.factor", "1");
                        put("config.storage.replication.factor", "1");
                    }
                });
                SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B");
                awaitMirrorMakerStart(startMirrorMaker, sourceAndTarget);
                awaitConnectorTasksStart(startMirrorMaker, MirrorHeartbeatConnector.class, sourceAndTarget);
                startKafkaCluster.createTopic("test-topic-1", 1);
                awaitTopicCreation("B", createAdminClient, "A.test-topic-1");
                awaitConnectorTasksStart(startMirrorMaker, MirrorSourceConnector.class, sourceAndTarget);
                writeToTopic(startKafkaCluster, "test-topic-1", 10);
                awaitTopicContent(startKafkaCluster2, "B", "A.test-topic-1", 10);
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testClusterWithEmitOffsetDisabled() throws Exception {
        Properties properties = new Properties();
        final EmbeddedKafkaCluster startKafkaCluster = startKafkaCluster("A", 1, properties);
        final EmbeddedKafkaCluster startKafkaCluster2 = startKafkaCluster("B", 1, properties);
        Admin createAdminClient = startKafkaCluster2.createAdminClient();
        Throwable th = null;
        try {
            try {
                MirrorMaker startMirrorMaker = startMirrorMaker("no-offset-syncing", new HashMap<String, String>() { // from class: org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.2
                    {
                        put("dedicated.mode.enable.internal.rest", "false");
                        put("listeners", "http://localhost:0");
                        put("refresh.topics.interval.seconds", "1");
                        put("clusters", String.join(", ", "A", "B"));
                        put("A.bootstrap.servers", startKafkaCluster.bootstrapServers());
                        put("B.bootstrap.servers", startKafkaCluster2.bootstrapServers());
                        put("A->B.enabled", "true");
                        put("A->B.topics", "^test-topic-.*");
                        put("replication.factor", "1");
                        put("checkpoints.topic.replication.factor", "1");
                        put("heartbeats.topic.replication.factor", "1");
                        put("emit.offset-syncs.enabled", "false");
                        put("status.storage.replication.factor", "1");
                        put("offset.storage.replication.factor", "1");
                        put("config.storage.replication.factor", "1");
                    }
                });
                SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B");
                awaitMirrorMakerStart(startMirrorMaker, sourceAndTarget, Arrays.asList(MirrorSourceConnector.class, MirrorHeartbeatConnector.class));
                awaitConnectorTasksStart(startMirrorMaker, MirrorHeartbeatConnector.class, sourceAndTarget);
                startKafkaCluster.createTopic("test-topic-1", 1);
                awaitTopicCreation("B", createAdminClient, "A.test-topic-1");
                awaitConnectorTasksStart(startMirrorMaker, MirrorSourceConnector.class, sourceAndTarget);
                writeToTopic(startKafkaCluster, "test-topic-1", 10);
                awaitTopicContent(startKafkaCluster2, "B", "A.test-topic-1", 10);
                Assertions.assertTrue(((List) startKafkaCluster.describeTopics(new String[]{"mm2-offset-syncs.B.internal"}).values().stream().filter((v0) -> {
                    return v0.isPresent();
                }).map((v0) -> {
                    return v0.get();
                }).collect(Collectors.toList())).isEmpty());
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testMultiNodeCluster() throws Exception {
        Properties properties = new Properties();
        properties.put("transaction.state.log.replication.factor", "1");
        properties.put("transaction.state.log.min.isr", "1");
        final EmbeddedKafkaCluster startKafkaCluster = startKafkaCluster("A", 1, properties);
        final EmbeddedKafkaCluster startKafkaCluster2 = startKafkaCluster("B", 1, properties);
        Admin createAdminClient = startKafkaCluster2.createAdminClient();
        Throwable th = null;
        try {
            try {
                HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.3
                    {
                        put("dedicated.mode.enable.internal.rest", "true");
                        put("listeners", "http://localhost:0");
                        put("refresh.topics.interval.seconds", "1");
                        put("clusters", String.join(", ", "A", "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618"));
                        put("A.bootstrap.servers", startKafkaCluster.bootstrapServers());
                        put("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.bootstrap.servers", startKafkaCluster2.bootstrapServers());
                        put("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.exactly.once.source.support", "enabled");
                        put("A.consumer.isolation.level", "read_committed");
                        put("A->B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.enabled", "true");
                        put("A->B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.topics", "^test-topic-.*");
                        put("A->B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.offset-syncs.topic.location", "target");
                        put("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618->A.enabled", "false");
                        put("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618->A.emit.heartbeats.enabled", "false");
                        put("replication.factor", "1");
                        put("checkpoints.topic.replication.factor", "1");
                        put("heartbeats.topic.replication.factor", "1");
                        put("offset-syncs.topic.replication.factor", "1");
                        put("offset.storage.replication.factor", "1");
                        put("status.storage.replication.factor", "1");
                        put("config.storage.replication.factor", "1");
                        put("A.scheduled.rebalance.max.delay.ms", "1000");
                        put("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618.scheduled.rebalance.max.delay.ms", "1000");
                    }
                };
                SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618");
                for (int i = 0; i < 3; i++) {
                    startMirrorMaker("node " + i, hashMap);
                }
                awaitMirrorMakerStart(this.mirrorMakers.get("node 0"), sourceAndTarget);
                awaitConnectorTasksStart(this.mirrorMakers.get("node 0"), MirrorHeartbeatConnector.class, sourceAndTarget);
                for (int i2 = 0; i2 < 3; i2++) {
                    String str = "test-topic-" + i2;
                    startKafkaCluster.createTopic(str, 1);
                    awaitTopicCreation("B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618", createAdminClient, "A." + str);
                    awaitConnectorTasksStart(this.mirrorMakers.get("node " + i2), MirrorSourceConnector.class, sourceAndTarget);
                    writeToTopic(startKafkaCluster, str, 10);
                    awaitTopicContent(startKafkaCluster2, "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618", "A." + str, 10);
                }
                HashMap hashMap2 = new HashMap(hashMap);
                String str2 = "2";
                hashMap2.put("refresh.topics.interval.seconds", "2");
                for (int i3 = 0; i3 < 3; i3++) {
                    stopMirrorMaker("node " + i3);
                    MirrorMaker mirrorMaker = this.mirrorMakers.values().stream().findAny().get();
                    awaitConnectorTasksStart(mirrorMaker, MirrorHeartbeatConnector.class, sourceAndTarget);
                    awaitConnectorTasksStart(mirrorMaker, MirrorSourceConnector.class, sourceAndTarget);
                    startMirrorMaker("node " + i3, hashMap2);
                }
                awaitTaskConfigurations(this.mirrorMakers.get("node 0"), MirrorSourceConnector.class, sourceAndTarget, map -> {
                    return str2.equals(map.get("refresh.topics.interval.seconds"));
                });
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private void awaitTopicCreation(String str, Admin admin, String str2) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                return ((Set) admin.listTopics().names().get()).contains(str2);
            } catch (Exception e) {
                log.debug("Failed to check for existence of topic {} on cluster {}", new Object[]{str2, str, e});
                return false;
            }
        }, 30000L, "topic " + str2 + " was not created on cluster " + str + " in time");
    }

    private void writeToTopic(EmbeddedKafkaCluster embeddedKafkaCluster, String str, int i) {
        for (int i2 = 0; i2 <= i; i2++) {
            embeddedKafkaCluster.produce(str, Integer.toString(i2));
        }
    }

    private void awaitMirrorMakerStart(MirrorMaker mirrorMaker, SourceAndTarget sourceAndTarget) throws InterruptedException {
        awaitMirrorMakerStart(mirrorMaker, sourceAndTarget, MirrorMaker.CONNECTOR_CLASSES);
    }

    private void awaitMirrorMakerStart(MirrorMaker mirrorMaker, SourceAndTarget sourceAndTarget, List<Class<?>> list) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return list.stream().allMatch(cls -> {
                    return isConnectorRunningForMirrorMaker(cls, mirrorMaker, sourceAndTarget);
                });
            } catch (Exception e) {
                log.error("Something unexpected occurred. Unable to check for startup status for mirror maker {}", mirrorMaker, e);
                throw new NoRetryException(e);
            }
        }, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition to running in time");
    }

    private <T extends SourceConnector> void awaitConnectorTasksStart(MirrorMaker mirrorMaker, Class<T> cls, SourceAndTarget sourceAndTarget) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return isTaskRunningForMirrorMakerConnector(cls, mirrorMaker, sourceAndTarget);
            } catch (Exception e) {
                log.error("Something unexpected occurred. Unable to check for startup status of connector {} for mirror maker with source->target={}", new Object[]{cls.getSimpleName(), sourceAndTarget, e});
                throw new NoRetryException(e);
            }
        }, MM_START_UP_TIMEOUT_MS, "Tasks for connector " + cls.getSimpleName() + " for MirrorMaker instances did not transition to running in time");
    }

    private <T extends SourceConnector> void awaitTaskConfigurations(MirrorMaker mirrorMaker, Class<T> cls, SourceAndTarget sourceAndTarget, Predicate<Map<String, String>> predicate) throws InterruptedException {
        String simpleName = cls.getSimpleName();
        TestUtils.waitForCondition(() -> {
            try {
                FutureCallback futureCallback = new FutureCallback();
                mirrorMaker.taskConfigs(sourceAndTarget, simpleName, futureCallback);
                return ((List) futureCallback.get(MM_START_UP_TIMEOUT_MS, TimeUnit.MILLISECONDS)).stream().map((v0) -> {
                    return v0.config();
                }).allMatch(predicate);
            } catch (ExecutionException e) {
                if (e.getCause() instanceof RebalanceNeededException) {
                    throw e;
                }
                log.error("Something unexpected occurred. Unable to get configuration of connector {} for mirror maker with source->target={}", new Object[]{simpleName, sourceAndTarget, e});
                throw new NoRetryException(e);
            }
        }, MM_START_UP_TIMEOUT_MS, "Connector configuration for " + simpleName + " for MirrorMaker instances is incorrect");
    }

    private void awaitTopicContent(EmbeddedKafkaCluster embeddedKafkaCluster, String str, String str2, int i) throws Exception {
        KafkaConsumer createConsumer = embeddedKafkaCluster.createConsumer(Collections.singletonMap("auto.offset.reset", "earliest"));
        Throwable th = null;
        try {
            try {
                createConsumer.subscribe(Collections.singleton(str2));
                AtomicInteger atomicInteger = new AtomicInteger(0);
                TestUtils.waitForCondition(() -> {
                    return atomicInteger.addAndGet(createConsumer.poll(Duration.ofSeconds(1L)).count()) >= i;
                }, 30000L, () -> {
                    return "could not read " + i + " from topic " + str2 + " on cluster " + str + " in time; only read " + atomicInteger.get();
                });
                if (createConsumer != null) {
                    if (0 == 0) {
                        createConsumer.close();
                        return;
                    }
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th4;
        }
    }

    private boolean isConnectorRunningForMirrorMaker(Class<?> cls, MirrorMaker mirrorMaker, SourceAndTarget sourceAndTarget) {
        try {
            ConnectorStateInfo connectorStatus = mirrorMaker.connectorStatus(sourceAndTarget, cls.getSimpleName());
            if (connectorStatus.connector().state().equals(AbstractStatus.State.FAILED.toString())) {
                throw new NoRetryException(new AssertionError(String.format("Connector %s is in FAILED state for MirrorMaker %s and source->target=%s", cls, mirrorMaker, sourceAndTarget)));
            }
            return connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
        } catch (NotFoundException e) {
            return false;
        }
    }

    private <T extends SourceConnector> boolean isTaskRunningForMirrorMakerConnector(Class<T> cls, MirrorMaker mirrorMaker, SourceAndTarget sourceAndTarget) {
        ConnectorStateInfo connectorStatus = mirrorMaker.connectorStatus(sourceAndTarget, cls.getSimpleName());
        return isConnectorRunningForMirrorMaker(cls, mirrorMaker, sourceAndTarget) && !connectorStatus.tasks().isEmpty() && connectorStatus.tasks().stream().allMatch(taskState -> {
            if (taskState.state().equals(AbstractStatus.State.FAILED.toString())) {
                throw new NoRetryException(new AssertionError(String.format("Task %s is in FAILED state", taskState)));
            }
            return taskState.state().equals(AbstractStatus.State.RUNNING.toString());
        });
    }
}
