package org.apache.kafka.raft;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
import org.apache.kafka.snapshot.Snapshots;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClientReconfigTest.class */
public class KafkaRaftClientReconfigTest {
    @Test
    public void testLeaderWritesBootstrapRecords() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}));
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(voterSet)).withUnknownLeader(0).build();
        List asList = Arrays.asList(Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord().setVersion((short) 0).setLastContainedLogTimestamp(0L)), new ControlRecord(ControlRecordType.KRAFT_VERSION, new KRaftVersionRecord().setVersion((short) 0).setKRaftVersion((short) 1)), new ControlRecord(ControlRecordType.KRAFT_VOTERS, voterSet.toVotersRecord((short) 0))), Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord().setVersion((short) 0))));
        Assertions.assertEquals(Snapshots.BOOTSTRAP_SNAPSHOT_ID, build.log.latestSnapshotId().get());
        RecordsSnapshotReader of = RecordsSnapshotReader.of(build.log.latestSnapshot().get(), build.serde, BufferSupplier.NO_CACHING, 8388608, false);
        Throwable th = null;
        try {
            try {
                SnapshotWriterReaderTest.assertControlSnapshot(asList, of);
                if (of != null) {
                    if (0 != 0) {
                        try {
                            of.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        of.close();
                    }
                }
                build.becomeLeader();
                RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
                Assertions.assertTrue(recordBatch.isControlBatch());
                Iterator it = recordBatch.iterator();
                Record record = (Record) it.next();
                RaftClientTestContext.verifyLeaderChangeMessage(replicaKey.id(), Arrays.asList(Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())), Arrays.asList(Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())), record.key(), record.value());
                Record record2 = (Record) it.next();
                verifyKRaftVersionRecord((short) 1, record2.key(), record2.value());
                Record record3 = (Record) it.next();
                verifyVotersRecord(voterSet, record3.key(), record3.value());
            } finally {
            }
        } catch (Throwable th3) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testBootstrapCheckpointIsNotReturnedOnFetch() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(0).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, 0L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
    }

    @Test
    public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))).withUnknownLeader(0).build();
        Arrays.asList(Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord().setVersion((short) 0).setLastContainedLogTimestamp(0L))), Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord().setVersion((short) 0))));
        build.becomeLeader();
        RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Iterator it = recordBatch.iterator();
        Record record = (Record) it.next();
        RaftClientTestContext.verifyLeaderChangeMessage(replicaKey.id(), Arrays.asList(Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())), Arrays.asList(Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())), record.key(), record.value());
        Assertions.assertFalse(it.hasNext());
    }

    @Test
    public void testFollowerDoesNotRequestLeaderBootstrapSnapshot() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withElectedLeader(1, replicaKey2.id()).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        build.assertFetchRequestData(assertSentFetchRequest, 1, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.snapshotFetchResponse(1, replicaKey2.id(), Snapshots.BOOTSTRAP_SNAPSHOT_ID, 0L));
        build.pollUntilRequest();
        build.assertFetchRequestData(build.assertSentFetchRequest(), 1, 0L, 0);
    }

    @Test
    public void testFollowerReadsKRaftBootstrapRecords() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}));
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(voterSet)).withElectedLeader(5, replicaKey2.id()).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey3));
        VoterSet voterSet2 = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.concat(voterSet.voterKeys().stream(), Stream.of(replicaKey3)));
        ByteBuffer allocate = ByteBuffer.allocate(128);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocate, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, true, 5, allocate.capacity());
        Throwable th = null;
        try {
            memoryRecordsBuilder.appendLeaderChangeMessage(0L, new LeaderChangeMessage());
            memoryRecordsBuilder.appendKRaftVersionMessage(0L, new KRaftVersionRecord().setVersion((short) 0).setKRaftVersion((short) 1));
            memoryRecordsBuilder.appendVotersMessage(0L, voterSet2.toVotersRecord((short) 0));
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, replicaKey2.id(), memoryRecordsBuilder.build(), 0L, Errors.NONE));
            if (memoryRecordsBuilder != null) {
                if (0 != 0) {
                    try {
                        memoryRecordsBuilder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    memoryRecordsBuilder.close();
                }
            }
            build.client.poll();
            Assertions.assertTrue(build.client.quorum().isVoter(replicaKey3));
        } catch (Throwable th3) {
            if (memoryRecordsBuilder != null) {
                if (0 != 0) {
                    try {
                        memoryRecordsBuilder.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    memoryRecordsBuilder.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAddVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey3));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentApiVersionsRequest = build.assertSentApiVersionsRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), assertSentApiVersionsRequest.destination());
        build.deliverResponse(assertSentApiVersionsRequest.correlationId(), assertSentApiVersionsRequest.destination(), apiVersionsResponse(Errors.NONE));
        build.client.poll();
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey3));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.NONE);
    }

    @Test
    void testAddVoterInvalidClusterId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true)})))).withUnknownLeader(3).build();
        build.becomeLeader();
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id())));
        build.deliverRequest(build.addVoterRequest("", Integer.MAX_VALUE, replicaKey2, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.addVoterRequest("invalid-uuid", Integer.MAX_VALUE, replicaKey2, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey2));
    }

    @Test
    void testAddVoterToNotLeader() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true)})))).withUnknownLeader(3).build();
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey2, Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id())))));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
    }

    @Test
    void testAddVoterWithMissingDefaultListener() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true)})))).withUnknownLeader(3).build();
        build.becomeLeader();
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey2, Endpoints.fromInetSocketAddresses(Collections.singletonMap(ListenerName.normalised("not_the_default_listener"), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id())))));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.INVALID_REQUEST);
    }

    @Test
    void testAddVoterWithPendingAddVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 3, true);
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey4, Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey4.id())))));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testAddVoterWithoutFencedPreviousLeaders() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true)})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey2, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testAddVoterWithKraftVersion0() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.UNSUPPORTED_VERSION);
    }

    @Test
    void testAddVoterWithExistingVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey2.id(), true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.DUPLICATE_VOTER);
    }

    @Test
    void testAddVoterTimeout() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentApiVersionsRequest = build.assertSentApiVersionsRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), assertSentApiVersionsRequest.destination());
        build.deliverResponse(assertSentApiVersionsRequest.correlationId(), assertSentApiVersionsRequest.destination(), apiVersionsResponse(Errors.NONE));
        build.client.poll();
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey3));
    }

    @Test
    void testAddVoterWithApiVersionsFromIncorrectNode() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentApiVersionsRequest = build.assertSentApiVersionsRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), assertSentApiVersionsRequest.destination());
        build.deliverResponse(assertSentApiVersionsRequest.correlationId(), assertSentApiVersionsRequest.destination(), apiVersionsResponse(Errors.INVALID_REQUEST));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testAddVoterInvalidFeatureVersion() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentApiVersionsRequest = build.assertSentApiVersionsRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), assertSentApiVersionsRequest.destination());
        build.deliverResponse(assertSentApiVersionsRequest.correlationId(), assertSentApiVersionsRequest.destination(), apiVersionsResponse(Errors.NONE, new SupportedVersionRange((short) 0)));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.INVALID_REQUEST);
    }

    @Test
    void testAddVoterWithLaggingNewVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentApiVersionsRequest = build.assertSentApiVersionsRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), assertSentApiVersionsRequest.destination());
        build.deliverResponse(assertSentApiVersionsRequest.correlationId(), assertSentApiVersionsRequest.destination(), apiVersionsResponse(Errors.NONE));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testAddVoterFailsWhenLosingLeadership() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id());
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), createUnresolved));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilRequest();
        Assertions.assertEquals(new Node(replicaKey3.id(), createUnresolved.getHostString(), createUnresolved.getPort()), build.assertSentApiVersionsRequest().destination());
        build.client.resign(currentEpoch);
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
    }

    @Test
    void testAddVoterWithMissingDirectoryId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, false);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.INVALID_REQUEST);
    }

    @Test
    public void testRemoveVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey3));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.client.poll();
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey3));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.NONE);
    }

    @Test
    public void testRemoveVoterIsLeader() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey));
        build.client.poll();
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.NONE);
        build.pollUntilRequest();
        build.collectEndQuorumRequests(currentEpoch, new HashSet(Arrays.asList(Integer.valueOf(replicaKey2.id()), Integer.valueOf(replicaKey3.id()))), Optional.empty());
        build.client.resign(currentEpoch);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isObserver());
        Assertions.assertTrue(build.client.quorum().isUnattached());
    }

    @Test
    public void testRemoveVoterInvalidClusterId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true)})))).withUnknownLeader(3).build();
        build.becomeLeader();
        build.deliverRequest(build.removeVoterRequest("", replicaKey2));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.removeVoterRequest("invalid-uuid", replicaKey2));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey2));
    }

    @Test
    void testRemoveVoterToNotLeader() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true)})))).withUnknownLeader(3).build();
        build.deliverRequest(build.removeVoterRequest(replicaKey2));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
    }

    @Test
    void testRemoveVoterWithPendingRemoveVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.client.poll();
        build.client.poll();
        build.deliverRequest(build.removeVoterRequest(replicaKey2));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testRemoveVoterWithoutFencedPreviousLeaders() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testRemoveVoterWithKraftVersion0() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3}))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.UNSUPPORTED_VERSION);
    }

    @Test
    void testRemoveVoterWithNoneVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(KafkaRaftClientTest.replicaKey(replicaKey3.id(), true)));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.VOTER_NOT_FOUND);
    }

    @Test
    void testRemoveVoterWithNoneVoterId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(ReplicaKey.of(replicaKey3.id() + 1, (Uuid) replicaKey3.directoryId().get())));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.VOTER_NOT_FOUND);
    }

    @Test
    void testRemoveVoterToEmptyVoterSet() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of(replicaKey)))).build();
        Assertions.assertEquals(OptionalInt.of(replicaKey.id()), build.currentLeader());
        build.deliverRequest(build.removeVoterRequest(replicaKey));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.VOTER_NOT_FOUND);
    }

    @Test
    void testRemoveVoterTimedOut() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.client.poll();
        build.client.poll();
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey3));
    }

    @Test
    void testRemoveVoterFailsWhenLosingLeadership() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.client.poll();
        build.client.poll();
        build.client.resign(currentEpoch);
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
        Assertions.assertFalse(build.client.quorum().isVoter(replicaKey3));
    }

    @Test
    void testAddVoterWithPendingRemoveVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.removeVoterRequest(replicaKey3));
        build.client.poll();
        build.client.poll();
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey4, Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey4.id())))));
        build.pollUntilResponse();
        build.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testRemoveVoterWithPendingAddVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        build.deliverRequest(build.removeVoterRequest(replicaKey2));
        build.pollUntilResponse();
        build.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
    }

    @Test
    void testUpdateVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey2));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id());
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id());
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(replicaKey2, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.NONE, OptionalInt.of(replicaKey.id()), currentEpoch);
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey2));
    }

    @Test
    void testLeaderUpdatesVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}));
        HashMap hashMap = new HashMap(2);
        hashMap.put(VoterSetTest.DEFAULT_LISTENER_NAME, InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey.id()));
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey.id()));
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(hashMap);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(voterSet)).withUnknownLeader(3).withLocalListeners(fromInetSocketAddresses).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        Assertions.assertTrue(build.client.quorum().isVoter(replicaKey2));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        Assertions.assertEquals(voterSet.updateVoter(VoterSet.VoterNode.of(replicaKey, fromInetSocketAddresses, Features.KRAFT_VERSION.supportedVersionRange())), build.listener.lastCommittedVoterSet());
    }

    @Test
    public void testUpdateVoterInvalidClusterId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.updateVoterRequest("", replicaKey2, currentEpoch, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty()));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID, OptionalInt.of(replicaKey.id()), currentEpoch);
        build.deliverRequest(build.updateVoterRequest("invalid-uuid", replicaKey2, currentEpoch, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty()));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterOldEpoch() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.updateVoterRequest(build.clusterId, replicaKey2, currentEpoch - 1, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty()));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.FENCED_LEADER_EPOCH, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterNewEpoch() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.updateVoterRequest(build.clusterId, replicaKey2, currentEpoch + 1, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty()));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.UNKNOWN_LEADER_EPOCH, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterToNotLeader() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.deliverRequest(build.updateVoterRequest(replicaKey2, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty()));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, OptionalInt.empty(), build.currentEpoch());
    }

    @Test
    void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id());
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id());
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(replicaKey2, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterWithKraftVersion0() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id());
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id());
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(replicaKey2, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.UNSUPPORTED_VERSION, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterWithNoneVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id());
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id());
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(KafkaRaftClientTest.replicaKey(replicaKey2.id(), true), Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterWithNoneVoterId() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id() + 1);
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id() + 1);
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(ReplicaKey.of(replicaKey2.id() + 1, (Uuid) replicaKey2.directoryId().get()), Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testUpdateVoterWithPendingAddVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withUnknownLeader(3).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(Collections.singletonMap(build.channel.listenerName(), InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey3.id())));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, build.log.endOffset().offset(), currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(replicaKey.id()));
        build.deliverRequest(build.addVoterRequest(Integer.MAX_VALUE, replicaKey3, fromInetSocketAddresses));
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id());
        InetSocketAddress createUnresolved2 = InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id());
        HashMap hashMap = new HashMap(2);
        hashMap.put(build.channel.listenerName(), createUnresolved);
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), createUnresolved2);
        build.deliverRequest(build.updateVoterRequest(replicaKey2, Features.KRAFT_VERSION.supportedVersionRange(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT, OptionalInt.of(replicaKey.id()), currentEpoch);
    }

    @Test
    void testFollowerSendsUpdateVoter() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true)}));
        HashMap hashMap = new HashMap(2);
        hashMap.put(VoterSetTest.DEFAULT_LISTENER_NAME, InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey.id()));
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey.id()));
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(hashMap);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(voterSet)).withElectedLeader(4, replicaKey2.id()).withLocalListeners(fromInetSocketAddresses).build();
        for (int i = 0; i < 3; i++) {
            MockTime mockTime = build.time;
            build.getClass();
            mockTime.sleep(50000 - 1);
            build.pollUntilRequest();
            RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
            build.assertFetchRequestData(assertSentFetchRequest, 4, 0L, 0);
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(4, replicaKey2.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
            build.client.poll();
        }
        MockTime mockTime2 = build.time;
        build.getClass();
        mockTime2.sleep(50000 - 1);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentUpdateVoterRequest = build.assertSentUpdateVoterRequest(replicaKey, 4, Features.KRAFT_VERSION.supportedVersionRange(), fromInetSocketAddresses);
        build.deliverResponse(assertSentUpdateVoterRequest.correlationId(), assertSentUpdateVoterRequest.destination(), build.updateVoterResponse(Errors.NONE, new LeaderAndEpoch(OptionalInt.of(replicaKey2.id()), 4)));
        build.pollUntilRequest();
        build.assertFetchRequestData(build.assertSentFetchRequest(), 4, 0L, 0);
    }

    @Test
    void testFollowerSendsUpdateVoterWhenDifferent() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true)})))).withElectedLeader(4, replicaKey2.id()).build();
        for (int i = 0; i < 3; i++) {
            MockTime mockTime = build.time;
            build.getClass();
            mockTime.sleep(50000 - 1);
            build.pollUntilRequest();
            RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
            build.assertFetchRequestData(assertSentFetchRequest, 4, 0L, 0);
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(4, replicaKey2.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
            build.client.poll();
        }
        MockTime mockTime2 = build.time;
        build.getClass();
        mockTime2.sleep(50000 - 1);
        build.pollUntilRequest();
        build.assertFetchRequestData(build.assertSentFetchRequest(), 4, 0L, 0);
        Assertions.assertNotEquals(OptionalLong.of(0L), build.messageQueue.lastPollTimeoutMs());
    }

    @Test
    void testUpdateVoterResponseCausesEpochChange() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3}));
        HashMap hashMap = new HashMap(2);
        hashMap.put(VoterSetTest.DEFAULT_LISTENER_NAME, InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey.id()));
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey.id()));
        Endpoints fromInetSocketAddresses = Endpoints.fromInetSocketAddresses(hashMap);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.of(voterSet)).withElectedLeader(4, replicaKey2.id()).withLocalListeners(fromInetSocketAddresses).build();
        for (int i = 0; i < 3; i++) {
            MockTime mockTime = build.time;
            build.getClass();
            mockTime.sleep(50000 - 1);
            build.pollUntilRequest();
            RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
            build.assertFetchRequestData(assertSentFetchRequest, 4, 0L, 0);
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(4, replicaKey2.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
            build.client.poll();
        }
        MockTime mockTime2 = build.time;
        build.getClass();
        mockTime2.sleep(50000 - 1);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentUpdateVoterRequest = build.assertSentUpdateVoterRequest(replicaKey, 4, Features.KRAFT_VERSION.supportedVersionRange(), fromInetSocketAddresses);
        build.deliverResponse(assertSentUpdateVoterRequest.correlationId(), assertSentUpdateVoterRequest.destination(), build.updateVoterResponse(Errors.NONE, new LeaderAndEpoch(OptionalInt.of(replicaKey3.id()), 4 + 1)));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        build.assertFetchRequestData(assertSentFetchRequest2, 4 + 1, 0L, 0);
        Assertions.assertEquals(replicaKey3.id(), assertSentFetchRequest2.destination().id());
    }

    @Test
    void testObserverDiscoversLeaderWithUnknownVoters() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withKip853Rpc(true).withBootstrapSnapshot(Optional.empty()).withUnknownLeader(3).withBootstrapServers(Optional.of(Collections.singletonList(InetSocketAddress.createUnresolved("localhost", 1234)))).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        build.assertFetchRequestData(assertSentFetchRequest, 3, 0L, 0);
        Assertions.assertEquals(-2, assertSentFetchRequest.destination().id());
    }

    @Test
    public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})))).withElectedLeader(3, replicaKey2.id()).withKip853Rpc(true).build();
        build.client.poll();
        HashMap hashMap = new HashMap(2);
        hashMap.put(VoterSetTest.DEFAULT_LISTENER_NAME, InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id()));
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id()));
        build.deliverRequest(build.beginEpochRequest(3, replicaKey2.id(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertElectedLeader(3, replicaKey2.id());
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, 3, OptionalInt.of(replicaKey2.id()));
    }

    private static void verifyVotersRecord(VoterSet voterSet, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Assertions.assertEquals(ControlRecordType.KRAFT_VOTERS, ControlRecordType.parse(byteBuffer));
        Assertions.assertEquals(voterSet, VoterSet.fromVotersRecord(ControlRecordUtils.deserializeVotersRecord(byteBuffer2)));
    }

    private static void verifyKRaftVersionRecord(short s, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Assertions.assertEquals(ControlRecordType.KRAFT_VERSION, ControlRecordType.parse(byteBuffer));
        Assertions.assertEquals(s, ControlRecordUtils.deserializeKRaftVersionRecord(byteBuffer2).kRaftVersion());
    }

    private int randomReplicaId() {
        return ThreadLocalRandom.current().nextInt(1025);
    }

    private static ApiVersionsResponseData apiVersionsResponse(Errors errors) {
        return apiVersionsResponse(errors, Features.KRAFT_VERSION.supportedVersionRange());
    }

    private static ApiVersionsResponseData apiVersionsResponse(Errors errors, SupportedVersionRange supportedVersionRange) {
        ApiVersionsResponseData.SupportedFeatureKeyCollection supportedFeatureKeyCollection = new ApiVersionsResponseData.SupportedFeatureKeyCollection(1);
        if (supportedVersionRange.max() > 0) {
            supportedFeatureKeyCollection.add(new ApiVersionsResponseData.SupportedFeatureKey().setName("kraft.version").setMinVersion(supportedVersionRange.min()).setMaxVersion(supportedVersionRange.max()));
        }
        return new ApiVersionsResponseData().setErrorCode(errors.code()).setSupportedFeatures(supportedFeatureKeyCollection);
    }
}
