package org.apache.kafka.metadata.migration;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsImageTest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.class */
public class KRaftMigrationDriverTest {
    List<Node> controllerNodes = Arrays.asList(new Node(4, "host4", 0), new Node(5, "host5", 0), new Node(6, "host6", 0));
    ApiVersions apiVersions = new ApiVersions();
    QuorumFeatures quorumFeatures = QuorumFeatures.create(4, this.apiVersions, QuorumFeatures.defaultFeatureMap(), this.controllerNodes);
    Time mockTime = new MockTime(1) { // from class: org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.1
        public long nanoseconds() {
            return System.nanoTime() - TimeUnit.NANOSECONDS.convert(990L, TimeUnit.MILLISECONDS);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriverTest$CountingMetadataPropagator.class */
    public static class CountingMetadataPropagator implements LegacyPropagator {
        public int deltas = 0;
        public int images = 0;

        CountingMetadataPropagator() {
        }

        public void startup() {
        }

        public void shutdown() {
        }

        public void publishMetadata(MetadataImage metadataImage) {
        }

        public void sendRPCsToBrokersFromMetadataDelta(MetadataDelta metadataDelta, MetadataImage metadataImage, int i) {
            this.deltas++;
        }

        public void sendRPCsToBrokersFromMetadataImage(MetadataImage metadataImage, int i) {
            this.images++;
        }

        public void clear() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriverTest$NoOpRecordConsumer.class */
    public static class NoOpRecordConsumer implements ZkRecordConsumer {
        NoOpRecordConsumer() {
        }

        public void beginMigration() {
        }

        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> list) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<OffsetAndEpoch> completeMigration() {
            return CompletableFuture.completedFuture(new OffsetAndEpoch(100L, 1));
        }

        public void abortMigration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriverTest$TopicDualWriteVerifier.class */
    public interface TopicDualWriteVerifier {
        void verify(KRaftMigrationDriver kRaftMigrationDriver, CapturingMigrationClient capturingMigrationClient, CapturingTopicMigrationClient capturingTopicMigrationClient, CapturingConfigMigrationClient capturingConfigMigrationClient) throws Exception;
    }

    @BeforeEach
    public void setup() {
        this.apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
        this.apiVersions.update("5", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
        this.apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
    }

    RegisterBrokerRecord zkBrokerRecord(int i) {
        RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord();
        registerBrokerRecord.setBrokerId(i);
        registerBrokerRecord.setIsMigratingZkBroker(true);
        registerBrokerRecord.setFenced(false);
        return registerBrokerRecord;
    }

    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(KRaftMigrationDriver kRaftMigrationDriver, MetadataDelta metadataDelta, MetadataImage metadataImage, MetadataProvenance metadataProvenance) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        kRaftMigrationDriver.enqueueMetadataChangeEvent(metadataDelta, metadataImage, metadataProvenance, false, th -> {
            if (th == null) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Test
    public void testOnlySendNeededRPCsToBrokers() throws Exception {
        CountingMetadataPropagator countingMetadataPropagator = new CountingMetadataPropagator();
        CapturingConfigMigrationClient capturingConfigMigrationClient = new CapturingConfigMigrationClient();
        KRaftMigrationDriver kRaftMigrationDriver = new KRaftMigrationDriver(3000, new NoOpRecordConsumer(), CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).setConfigMigrationClient(capturingConfigMigrationClient).build(), countingMetadataPropagator, metadataPublisher -> {
        }, new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);
        Throwable th = null;
        try {
            try {
                MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
                kRaftMigrationDriver.start();
                metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
                metadataDelta.replay(zkBrokerRecord(1));
                metadataDelta.replay(zkBrokerRecord(2));
                metadataDelta.replay(zkBrokerRecord(3));
                MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
                MetadataImage apply = metadataDelta.apply(metadataProvenance);
                LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(3000), 1);
                kRaftMigrationDriver.onControllerChange(leaderAndEpoch);
                kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, leaderAndEpoch, 1, 100L, 42L));
                TestUtils.waitForCondition(() -> {
                    return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
                }, "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
                Assertions.assertEquals(1, countingMetadataPropagator.images);
                Assertions.assertEquals(0, countingMetadataPropagator.deltas);
                MetadataDelta metadataDelta2 = new MetadataDelta(apply);
                metadataDelta2.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
                MetadataProvenance metadataProvenance2 = new MetadataProvenance(120L, 1, 2L);
                MetadataImage apply2 = metadataDelta2.apply(metadataProvenance2);
                enqueueMetadataChangeEventWithFuture(kRaftMigrationDriver, metadataDelta2, apply2, metadataProvenance2).get(1L, TimeUnit.MINUTES);
                Assertions.assertEquals(1, capturingConfigMigrationClient.writtenConfigs.size());
                Assertions.assertEquals(1, countingMetadataPropagator.images);
                Assertions.assertEquals(0, countingMetadataPropagator.deltas);
                MetadataDelta metadataDelta3 = new MetadataDelta(apply2);
                metadataDelta3.replay(new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(0L).setFenced(BrokerRegistrationFencingChange.NONE.value()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
                MetadataProvenance metadataProvenance3 = new MetadataProvenance(130L, 1, 3L);
                enqueueMetadataChangeEventWithFuture(kRaftMigrationDriver, metadataDelta3, metadataDelta3.apply(metadataProvenance3), metadataProvenance3).get(1L, TimeUnit.MINUTES);
                Assertions.assertEquals(1, countingMetadataPropagator.images);
                Assertions.assertEquals(1, countingMetadataPropagator.deltas);
                if (kRaftMigrationDriver != null) {
                    if (0 == 0) {
                        kRaftMigrationDriver.close();
                        return;
                    }
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kRaftMigrationDriver != null) {
                if (th != null) {
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kRaftMigrationDriver.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMigrationWithClientException(final boolean z) throws Exception {
        CountingMetadataPropagator countingMetadataPropagator = new CountingMetadataPropagator();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        CapturingMigrationClient capturingMigrationClient = new CapturingMigrationClient(new HashSet(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient()) { // from class: org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.2
            @Override // org.apache.kafka.metadata.migration.CapturingMigrationClient
            public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState zkMigrationLeadershipState) {
                if (countDownLatch.getCount() == 0) {
                    return super.claimControllerLeadership(zkMigrationLeadershipState);
                }
                countDownLatch.countDown();
                if (z) {
                    throw new MigrationClientAuthException(new RuntimeException("Some kind of ZK auth error!"));
                }
                throw new MigrationClientException("Some kind of ZK error!");
            }
        };
        MockFaultHandler mockFaultHandler = new MockFaultHandler("testMigrationClientExpiration");
        KRaftMigrationDriver kRaftMigrationDriver = new KRaftMigrationDriver(3000, new NoOpRecordConsumer(), capturingMigrationClient, countingMetadataPropagator, metadataPublisher -> {
        }, mockFaultHandler, this.quorumFeatures, this.mockTime);
        Throwable th = null;
        try {
            MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
            kRaftMigrationDriver.start();
            metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            metadataDelta.replay(zkBrokerRecord(1));
            metadataDelta.replay(zkBrokerRecord(2));
            metadataDelta.replay(zkBrokerRecord(3));
            MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
            MetadataImage apply = metadataDelta.apply(metadataProvenance);
            kRaftMigrationDriver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100L, 42L));
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.MINUTES));
            TestUtils.waitForCondition(() -> {
                return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
            }, "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            if (z) {
                Assertions.assertEquals(MigrationClientAuthException.class, mockFaultHandler.firstException().getCause().getClass());
            } else {
                Assertions.assertNull(mockFaultHandler.firstException());
            }
            if (kRaftMigrationDriver != null) {
                if (0 == 0) {
                    kRaftMigrationDriver.close();
                    return;
                }
                try {
                    kRaftMigrationDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kRaftMigrationDriver != null) {
                if (0 != 0) {
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kRaftMigrationDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception {
        CountingMetadataPropagator countingMetadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient build = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
        this.apiVersions.remove("6");
        KRaftMigrationDriver kRaftMigrationDriver = new KRaftMigrationDriver(3000, new NoOpRecordConsumer(), build, countingMetadataPropagator, metadataPublisher -> {
        }, new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);
        Throwable th = null;
        try {
            try {
                MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
                kRaftMigrationDriver.start();
                metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
                metadataDelta.replay(zkBrokerRecord(1));
                MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
                MetadataImage apply = metadataDelta.apply(metadataProvenance);
                LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(3000), 1);
                kRaftMigrationDriver.onControllerChange(leaderAndEpoch);
                kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, leaderAndEpoch, 1, 100L, 42L));
                TestUtils.waitForCondition(() -> {
                    return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
                }, "Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
                this.apiVersions.update("6", NodeApiVersions.create());
                Assertions.assertEquals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM, kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES));
                this.apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
                TestUtils.waitForCondition(() -> {
                    return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
                }, "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
                if (kRaftMigrationDriver != null) {
                    if (0 == 0) {
                        kRaftMigrationDriver.close();
                        return;
                    }
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kRaftMigrationDriver != null) {
                if (th != null) {
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kRaftMigrationDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSkipWaitForBrokersInDualWrite() throws Exception {
        CountingMetadataPropagator countingMetadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient capturingMigrationClient = new CapturingMigrationClient(Collections.emptySet(), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
        KRaftMigrationDriver kRaftMigrationDriver = new KRaftMigrationDriver(3000, new NoOpRecordConsumer(), capturingMigrationClient, countingMetadataPropagator, metadataPublisher -> {
        }, new MockFaultHandler("testMigrationClientExpiration"), this.quorumFeatures, this.mockTime);
        Throwable th = null;
        try {
            MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
            capturingMigrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            kRaftMigrationDriver.start();
            metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            metadataDelta.replay(zkBrokerRecord(1));
            metadataDelta.replay(zkBrokerRecord(2));
            metadataDelta.replay(zkBrokerRecord(3));
            metadataDelta.replay(ZkMigrationState.MIGRATION.toRecord().message());
            MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
            MetadataImage apply = metadataDelta.apply(metadataProvenance);
            kRaftMigrationDriver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100L, 42L));
            TestUtils.waitForCondition(() -> {
                return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
            }, "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
            if (kRaftMigrationDriver != null) {
                if (0 == 0) {
                    kRaftMigrationDriver.close();
                    return;
                }
                try {
                    kRaftMigrationDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kRaftMigrationDriver != null) {
                if (0 != 0) {
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kRaftMigrationDriver.close();
                }
            }
            throw th3;
        }
    }

    public void setupTopicDualWrite(TopicDualWriteVerifier topicDualWriteVerifier) throws Exception {
        CountingMetadataPropagator countingMetadataPropagator = new CountingMetadataPropagator();
        CapturingTopicMigrationClient capturingTopicMigrationClient = new CapturingTopicMigrationClient() { // from class: org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.3
            @Override // org.apache.kafka.metadata.migration.CapturingTopicMigrationClient
            public void iterateTopics(EnumSet<TopicMigrationClient.TopicVisitorInterest> enumSet, TopicMigrationClient.TopicVisitor topicVisitor) {
                TopicsImageTest.IMAGE1.topicsByName().forEach((str, topicImage) -> {
                    HashMap hashMap = new HashMap();
                    topicImage.partitions().forEach((num, partitionRegistration) -> {
                    });
                    topicVisitor.visitTopic(str, topicImage.id(), hashMap);
                    topicImage.partitions().forEach((num2, partitionRegistration2) -> {
                        topicVisitor.visitPartition(new TopicIdPartition(topicImage.id(), new TopicPartition(str, num2.intValue())), partitionRegistration2);
                    });
                });
            }
        };
        CapturingConfigMigrationClient capturingConfigMigrationClient = new CapturingConfigMigrationClient();
        CapturingMigrationClient build = CapturingMigrationClient.newBuilder().setBrokersInZk(0, 1, 2, 3, 4, 5).setTopicMigrationClient(capturingTopicMigrationClient).setConfigMigrationClient(capturingConfigMigrationClient).build();
        KRaftMigrationDriver kRaftMigrationDriver = new KRaftMigrationDriver(3000, new NoOpRecordConsumer(), build, countingMetadataPropagator, metadataPublisher -> {
        }, new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);
        Throwable th = null;
        try {
            try {
                topicDualWriteVerifier.verify(kRaftMigrationDriver, build, capturingTopicMigrationClient, capturingConfigMigrationClient);
                if (kRaftMigrationDriver != null) {
                    if (0 == 0) {
                        kRaftMigrationDriver.close();
                        return;
                    }
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kRaftMigrationDriver != null) {
                if (th != null) {
                    try {
                        kRaftMigrationDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kRaftMigrationDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTopicDualWriteSnapshot() throws Exception {
        setupTopicDualWrite((kRaftMigrationDriver, capturingMigrationClient, capturingTopicMigrationClient, capturingConfigMigrationClient) -> {
            MetadataDelta metadataDelta = new MetadataDelta(new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY));
            kRaftMigrationDriver.start();
            metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            metadataDelta.replay(zkBrokerRecord(0));
            metadataDelta.replay(zkBrokerRecord(1));
            metadataDelta.replay(zkBrokerRecord(2));
            metadataDelta.replay(zkBrokerRecord(3));
            metadataDelta.replay(zkBrokerRecord(4));
            metadataDelta.replay(zkBrokerRecord(5));
            MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
            MetadataImage apply = metadataDelta.apply(metadataProvenance);
            LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            kRaftMigrationDriver.onControllerChange(leaderAndEpoch);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, leaderAndEpoch, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> {
                return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
            }, "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
            MetadataProvenance metadataProvenance2 = new MetadataProvenance(200L, 1, 1L);
            MetadataDelta metadataDelta2 = new MetadataDelta(apply);
            RecordTestUtils.replayAll(metadataDelta2, TopicsImageTest.DELTA1_RECORDS);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta2, metadataDelta2.apply(metadataProvenance2), new SnapshotManifest(metadataProvenance2, 100L));
            kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals(1, capturingTopicMigrationClient.deletedTopics.size());
            Assertions.assertEquals("foo", capturingTopicMigrationClient.deletedTopics.get(0));
            Assertions.assertEquals(1, capturingTopicMigrationClient.createdTopics.size());
            Assertions.assertEquals("baz", capturingTopicMigrationClient.createdTopics.get(0));
            Assertions.assertTrue(capturingTopicMigrationClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), capturingConfigMigrationClient.deletedResources.get(0));
        });
    }

    @Test
    public void testTopicDualWriteDelta() throws Exception {
        setupTopicDualWrite((kRaftMigrationDriver, capturingMigrationClient, capturingTopicMigrationClient, capturingConfigMigrationClient) -> {
            MetadataDelta metadataDelta = new MetadataDelta(new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY));
            kRaftMigrationDriver.start();
            metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            metadataDelta.replay(zkBrokerRecord(0));
            metadataDelta.replay(zkBrokerRecord(1));
            metadataDelta.replay(zkBrokerRecord(2));
            metadataDelta.replay(zkBrokerRecord(3));
            metadataDelta.replay(zkBrokerRecord(4));
            metadataDelta.replay(zkBrokerRecord(5));
            MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
            MetadataImage apply = metadataDelta.apply(metadataProvenance);
            LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            kRaftMigrationDriver.onControllerChange(leaderAndEpoch);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, leaderAndEpoch, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> {
                return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
            }, "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
            MetadataProvenance metadataProvenance2 = new MetadataProvenance(200L, 1, 1L);
            MetadataDelta metadataDelta2 = new MetadataDelta(apply);
            RecordTestUtils.replayAll(metadataDelta2, TopicsImageTest.DELTA1_RECORDS);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta2, metadataDelta2.apply(metadataProvenance2), new LogDeltaManifest(metadataProvenance2, leaderAndEpoch, 1, 100L, 42L));
            kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals(1, capturingTopicMigrationClient.deletedTopics.size());
            Assertions.assertEquals("foo", capturingTopicMigrationClient.deletedTopics.get(0));
            Assertions.assertEquals(1, capturingTopicMigrationClient.createdTopics.size());
            Assertions.assertEquals("baz", capturingTopicMigrationClient.createdTopics.get(0));
            Assertions.assertTrue(capturingTopicMigrationClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), capturingConfigMigrationClient.deletedResources.get(0));
        });
    }

    @Test
    public void testControllerFailover() throws Exception {
        setupTopicDualWrite((kRaftMigrationDriver, capturingMigrationClient, capturingTopicMigrationClient, capturingConfigMigrationClient) -> {
            MetadataDelta metadataDelta = new MetadataDelta(new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY));
            kRaftMigrationDriver.start();
            metadataDelta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            metadataDelta.replay(zkBrokerRecord(0));
            metadataDelta.replay(zkBrokerRecord(1));
            metadataDelta.replay(zkBrokerRecord(2));
            metadataDelta.replay(zkBrokerRecord(3));
            metadataDelta.replay(zkBrokerRecord(4));
            metadataDelta.replay(zkBrokerRecord(5));
            MetadataProvenance metadataProvenance = new MetadataProvenance(100L, 1, 1L);
            MetadataImage apply = metadataDelta.apply(metadataProvenance);
            LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(3001), 1);
            kRaftMigrationDriver.onControllerChange(leaderAndEpoch);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta, apply, new LogDeltaManifest(metadataProvenance, leaderAndEpoch, 1, 100L, 42L));
            capturingMigrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            MetadataProvenance metadataProvenance2 = new MetadataProvenance(200L, 1, 1L);
            MetadataDelta metadataDelta2 = new MetadataDelta(apply);
            RecordTestUtils.replayAll(metadataDelta2, TopicsImageTest.DELTA1_RECORDS);
            kRaftMigrationDriver.onMetadataUpdate(metadataDelta2, metadataDelta2.apply(metadataProvenance2), new LogDeltaManifest(metadataProvenance2, leaderAndEpoch, 1, 100L, 42L));
            kRaftMigrationDriver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            TestUtils.waitForCondition(() -> {
                return ((MigrationDriverState) kRaftMigrationDriver.migrationState().get(1L, TimeUnit.MINUTES)).equals(MigrationDriverState.DUAL_WRITE);
            }, "");
            Assertions.assertEquals(1, capturingTopicMigrationClient.deletedTopics.size());
            Assertions.assertEquals("foo", capturingTopicMigrationClient.deletedTopics.get(0));
            Assertions.assertEquals(1, capturingTopicMigrationClient.createdTopics.size());
            Assertions.assertEquals("baz", capturingTopicMigrationClient.createdTopics.get(0));
            Assertions.assertTrue(capturingTopicMigrationClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), capturingConfigMigrationClient.deletedResources.get(0));
        });
    }
}
