package org.apache.kafka.raft;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClientTest.class */
public class KafkaRaftClientTest {
    @Test
    public void testNodeDirectoryId() {
        RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(randomReplicaId(), Uuid.ZERO_UUID);
        builder.getClass();
        Assertions.assertThrows(IllegalArgumentException.class, builder::build);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeSingleMemberQuorum(boolean z) throws IOException {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Collections.singleton(Integer.valueOf(randomReplicaId))).withKip853Rpc(z).build();
        build.assertElectedLeader(1, randomReplicaId);
        Assertions.assertEquals(build.log.endOffset().offset(), build.client.logEndOffset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Collections.singleton(Integer.valueOf(randomReplicaId))).withKip853Rpc(z).withElectedLeader(2, randomReplicaId).build();
        build.pollUntil(() -> {
            return build.log.endOffset().offset() == 1;
        });
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        Assertions.assertEquals(2 + 1, build.log.lastFetchedEpoch());
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(randomReplicaId), 2 + 1), build.currentLeaderAndEpoch());
        build.assertElectedLeader(2 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testRejectVotesFromSameEpochAfterResigningLeadership(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withElectedLeader(2, randomReplicaId).withKip853Rpc(z).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertElectedLeader(2, randomReplicaId);
        build.deliverRequest(build.voteRequest(2, replicaKey, build.log.lastFetchedEpoch(), build.log.endOffset().offset()));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(randomReplicaId), false);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testRejectVotesFromSameEpochAfterResigningCandidacy(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withVotedCandidate(2, ReplicaKey.of(randomReplicaId, ReplicaKey.NO_DIRECTORY_ID)).withKip853Rpc(z).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertVotedCandidate(2, randomReplicaId);
        build.deliverRequest(build.voteRequest(2, replicaKey, build.log.lastFetchedEpoch(), build.log.endOffset().offset()));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testGrantVotesFromHigherEpochAfterResigningLeadership(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withElectedLeader(2, randomReplicaId).withKip853Rpc(z).build();
        Assertions.assertTrue(build.client.quorum().isResigned());
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertElectedLeader(2, randomReplicaId);
        build.deliverRequest(build.voteRequest(2 + 1, replicaKey, build.log.lastFetchedEpoch(), build.log.endOffset().offset()));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattachedAndVoted());
        build.assertVotedCandidate(2 + 1, replicaKey.id());
        build.assertSentVoteResponse(Errors.NONE, 2 + 1, OptionalInt.empty(), true);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testGrantVotesFromHigherEpochAfterResigningCandidacy(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withVotedCandidate(2, ReplicaKey.of(randomReplicaId, ReplicaKey.NO_DIRECTORY_ID)).withKip853Rpc(z).build();
        Assertions.assertTrue(build.client.quorum().isCandidate());
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertVotedCandidate(2, randomReplicaId);
        build.deliverRequest(build.voteRequest(2 + 1, replicaKey, build.log.lastFetchedEpoch(), build.log.endOffset().offset()));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattachedAndVoted());
        build.assertVotedCandidate(2 + 1, replicaKey.id());
        build.assertSentVoteResponse(Errors.NONE, 2 + 1, OptionalInt.empty(), true);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testGrantVotesWhenShuttingDown(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey = replicaKey(i, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.client.shutdown(1000);
        Assertions.assertTrue(build.client.isShuttingDown());
        build.deliverRequest(build.voteRequest(currentEpoch + 1, replicaKey, build.log.lastFetchedEpoch(), build.log.endOffset().offset()));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattachedAndVoted(), "Local Id: " + randomReplicaId + " Remote Id: " + i + " Quorum local Id: " + build.client.quorum().localIdOrSentinel() + " Quorum leader Id: " + build.client.quorum().leaderIdOrSentinel());
        build.assertVotedCandidate(currentEpoch + 1, replicaKey.id());
        build.assertSentVoteResponse(Errors.NONE, currentEpoch + 1, OptionalInt.empty(), true);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsResignedAndBecomeCandidate(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withElectedLeader(2, randomReplicaId).withKip853Rpc(z).build();
        Assertions.assertTrue(build.client.quorum().isResigned());
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertElectedLeader(2, randomReplicaId);
        build.time.sleep(build.electionTimeoutMs());
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.assertVotedCandidate(2 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsResignedLeaderFromStateStore(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)});
        int i2 = 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withKip853Rpc(z).withElectedLeader(2, randomReplicaId).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertElectedLeader(2, randomReplicaId);
        build.client.poll();
        Assertions.assertThrows(NotLeaderException.class, () -> {
            build.client.prepareAppend(i2, Arrays.asList("a", "b"));
        });
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentEndQuorumEpochRequest = build.assertSentEndQuorumEpochRequest(2, i);
        build.deliverResponse(assertSentEndQuorumEpochRequest.correlationId(), assertSentEndQuorumEpochRequest.destination(), build.endEpochResponse(2, OptionalInt.of(randomReplicaId)));
        build.client.poll();
        build.time.sleep(build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(2 + 1, randomReplicaId);
        build.assertSentVoteRequest(2 + 1, 0, 0L, 1);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAppendFailedWithNotLeaderException(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)});
        int i = 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withUnknownLeader(2).withKip853Rpc(z).build();
        Assertions.assertThrows(NotLeaderException.class, () -> {
            build.client.prepareAppend(i, Arrays.asList("a", "b"));
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAppendFailedWithBufferAllocationException(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate).thenReturn((Object) null);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withMemoryPool(memoryPool).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertThrows(BufferAllocationException.class, () -> {
            build.client.prepareAppend(currentEpoch, Collections.singletonList("a"));
        });
        ((MemoryPool) Mockito.verify(memoryPool)).release(allocate);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAppendFailedWithFencedEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            build.client.prepareAppend(currentEpoch + 1, Collections.singletonList("a"));
        });
        Assertions.assertThrows(NotLeaderException.class, () -> {
            build.client.prepareAppend(currentEpoch - 1, Collections.singletonList("a"));
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAppendFailedWithRecordBatchTooLargeException(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        ArrayList arrayList = new ArrayList(1048577 + 1);
        for (int i = 0; i < 1048577; i++) {
            arrayList.add("a");
        }
        Assertions.assertThrows(RecordBatchTooLargeException.class, () -> {
            build.client.prepareAppend(currentEpoch, arrayList);
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumEpochRetriesWhileResigned(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withElectionTimeoutMs(10000).withRequestTimeoutMs(5000).withElectedLeader(19, randomReplicaId).withKip853Rpc(z).build();
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectEndQuorumRequests = build.collectEndQuorumRequests(19, Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(i2)}), Optional.empty());
        Assertions.assertEquals(2, collectEndQuorumRequests.size());
        RaftRequest.Outbound outbound = collectEndQuorumRequests.get(0);
        build.deliverResponse(outbound.correlationId(), outbound.destination(), build.endEpochResponse(19, OptionalInt.of(randomReplicaId)));
        build.client.poll();
        Assertions.assertEquals(Collections.emptyList(), build.channel.drainSendQueue());
        int id = collectEndQuorumRequests.get(1).destination().id();
        build.time.sleep(6000L);
        build.pollUntilRequest();
        Assertions.assertEquals(1, build.collectEndQuorumRequests(19, Utils.mkSet(new Integer[]{Integer.valueOf(id)}), Optional.empty()).size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testResignWillCompleteFetchPurgatory(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, build.log.endOffset().offset(), currentEpoch, 1000));
        build.client.poll();
        build.log.appendAsLeader(build.buildBatch(build.log.endOffset().offset(), currentEpoch, Collections.singletonList("raft")), currentEpoch);
        build.client.shutdown(1000);
        build.client.poll();
        build.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, currentEpoch, OptionalInt.of(randomReplicaId));
        build.assertResignedLeader(currentEpoch, randomReplicaId);
        build.time.sleep(1000L);
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertFalse(build.client.isShuttingDown());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testResignInOlderEpochIgnored(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        build.client.resign(currentEpoch - 1);
        build.client.poll();
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        build.assertElectedLeader(currentEpoch, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleBeginQuorumEpochAfterUserInitiatedResign(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        build.client.resign(currentEpoch);
        QuorumState quorum = build.client.quorum();
        quorum.getClass();
        build.pollUntil(quorum::isResigned);
        build.deliverRequest(build.beginEpochRequest(currentEpoch + 1, i));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.NONE);
        build.assertElectedLeader(currentEpoch + 1, i);
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(i), currentEpoch + 1), build.listener.currentLeaderAndEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testBeginQuorumEpochHeartbeat(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(25000L);
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(currentEpoch, Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(i2)}));
        build.getClass();
        build.time.sleep(25000 / 2);
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(currentEpoch, Utils.mkSet(new Integer[0]));
        MockTime mockTime2 = build.time;
        build.getClass();
        mockTime2.sleep(25000 - r0);
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(currentEpoch, Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(i2)}));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        int i3 = randomReplicaId + 3;
        ReplicaKey replicaKey = replicaKey(i, z);
        ReplicaKey replicaKey2 = replicaKey(i2, z);
        ReplicaKey replicaKey3 = replicaKey(i3, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())})).withKip853Rpc(z).build();
        build.getClass();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        build.time.sleep(75000 / 2);
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isResigned());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 0L, 0, 0));
        build.pollUntilRequest();
        build.time.sleep(75000 / 2);
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isResigned());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey2, 0L, 0, 0));
        build.pollUntilRequest();
        build.time.sleep(75000 / 2);
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isResigned());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, 0L, 0, 0));
        build.pollUntilRequest();
        build.time.sleep(75000 / 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isResigned());
        build.assertResignedLeader(currentEpoch, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderShouldNotResignLeadershipIfOnlyOneVoters(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId)})).withKip853Rpc(z).build();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(75000L);
        build.client.poll();
        Assertions.assertFalse(build.client.quorum().isResigned());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testElectionTimeoutAfterUserInitiatedResign(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        build.client.resign(currentEpoch);
        QuorumState quorum = build.client.quorum();
        quorum.getClass();
        build.pollUntil(quorum::isResigned);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentEndQuorumEpochRequest = build.assertSentEndQuorumEpochRequest(currentEpoch, i);
        build.deliverResponse(assertSentEndQuorumEpochRequest.correlationId(), assertSentEndQuorumEpochRequest.destination(), build.endEpochResponse(currentEpoch, OptionalInt.of(randomReplicaId)));
        build.client.poll();
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50L);
        build.client.poll();
        Assertions.assertFalse(build.channel.hasSentRequests());
        build.deliverRequest(build.fetchRequest(1, replicaKey(-1, z), 0L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, currentEpoch, OptionalInt.of(randomReplicaId));
        build.time.sleep(2 * build.electionTimeoutMs());
        QuorumState quorum2 = build.client.quorum();
        quorum2.getClass();
        build.pollUntil(quorum2::isCandidate);
        Assertions.assertEquals(currentEpoch + 1, build.currentEpoch());
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1), build.listener.currentLeaderAndEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCannotResignWithLargerEpochThanCurrentEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            build.client.resign(build.currentEpoch() + 1);
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCannotResignIfNotLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(2, i).withKip853Rpc(z).build();
        Assertions.assertEquals(OptionalInt.of(i), build.currentLeader());
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            build.client.resign(i2);
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCannotResignIfObserver(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = 5;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId() + 1)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(OptionalInt.empty(), (Set<Integer>) mkSet).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        MatcherAssert.assertThat(Integer.valueOf(assertSentFetchRequest.destination().id()), Matchers.is(Matchers.in(mkSet)));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, randomReplicaId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, randomReplicaId);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            build.client.resign(i);
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsCandidateFromStateStore(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1), Integer.valueOf(randomReplicaId + 2)})).withVotedCandidate(2, ReplicaKey.of(randomReplicaId, ReplicaKey.NO_DIRECTORY_ID)).withKip853Rpc(z).build();
        build.assertVotedCandidate(2, randomReplicaId);
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.pollUntilRequest();
        Assertions.assertEquals(2, build.collectVoteRequests(2, 0, 0L).size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsCandidateAndBecomeLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withKip853Rpc(z).build();
        build.assertUnknownLeader(0);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(0, 0L, 0);
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().isVoter());
        build.time.sleep(build.electionTimeoutMs() / 2);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(0, -1, MemoryRecords.EMPTY, -1L, Errors.NOT_LEADER_OR_FOLLOWER));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().isVoter());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, randomReplicaId);
        RaftRequest.Outbound assertSentVoteRequest = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.deliverResponse(assertSentVoteRequest.correlationId(), assertSentVoteRequest.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        build.pollUntil(() -> {
            return build.log.endOffset().offset() == 1;
        });
        build.assertElectedLeader(1, randomReplicaId);
        long milliseconds = build.time.milliseconds();
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        Assertions.assertEquals(1L, build.log.firstUnflushedOffset());
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(new Integer[]{Integer.valueOf(i)}));
        RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Record record = (Record) recordBatch.iterator().next();
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(randomReplicaId, Arrays.asList(Integer.valueOf(randomReplicaId), Integer.valueOf(i)), Arrays.asList(Integer.valueOf(i), Integer.valueOf(randomReplicaId)), record.key(), record.value());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withKip853Rpc(z).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, randomReplicaId);
        RaftRequest.Outbound assertSentVoteRequest = build.assertSentVoteRequest(1, 0, 0L, 2);
        build.deliverResponse(assertSentVoteRequest.correlationId(), assertSentVoteRequest.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        int voterId = assertSentVoteRequest.data().voterId();
        Assertions.assertNotEquals(randomReplicaId, voterId);
        build.pollUntil(() -> {
            return build.log.endOffset().offset() == 1;
        });
        build.assertElectedLeader(1, randomReplicaId);
        long milliseconds = build.time.milliseconds();
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        Assertions.assertEquals(1L, build.log.firstUnflushedOffset());
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(i2)}));
        RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Record record = (Record) recordBatch.iterator().next();
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(randomReplicaId, Arrays.asList(Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)), Arrays.asList(Integer.valueOf(voterId), Integer.valueOf(randomReplicaId)), record.key(), record.value());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleBeginQuorumRequest(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withVotedCandidate(2, replicaKey).withKip853Rpc(z).build();
        build.deliverRequest(build.beginEpochRequest(2, replicaKey.id()));
        build.pollUntilResponse();
        build.assertElectedLeader(2, replicaKey.id());
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(replicaKey.id()));
    }

    @Test
    public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
        ReplicaKey replicaKey = replicaKey(randomReplicaId(), true);
        ReplicaKey replicaKey2 = replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))).withElectedLeader(3, replicaKey2.id()).withKip853Rpc(true).build();
        build.client.poll();
        HashMap hashMap = new HashMap(2);
        hashMap.put(VoterSetTest.DEFAULT_LISTENER_NAME, InetSocketAddress.createUnresolved("localhost", 9990 + replicaKey2.id()));
        hashMap.put(ListenerName.normalised("ANOTHER_LISTENER"), InetSocketAddress.createUnresolved("localhost", 8990 + replicaKey2.id()));
        build.deliverRequest(build.beginEpochRequest(3, replicaKey2.id(), Endpoints.fromInetSocketAddresses(hashMap)));
        build.pollUntilResponse();
        build.assertElectedLeader(3, replicaKey2.id());
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, 3, OptionalInt.of(replicaKey2.id()));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleBeginQuorumResponse(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(2, randomReplicaId).withKip853Rpc(z).build();
        build.deliverRequest(build.beginEpochRequest(2 + 1, i));
        build.pollUntilResponse();
        build.assertElectedLeader(2 + 1, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumIgnoredAsCandidateIfOlderEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = 85;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(i2);
        }).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.time.sleep(build.electionTimeoutMs() + 85);
        build.client.poll();
        build.assertVotedCandidate(5, randomReplicaId);
        build.deliverRequest(build.endEpochRequest(5 - 2, i, Collections.singletonList(build.localReplicaKey())));
        build.client.poll();
        build.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, 5, OptionalInt.empty());
        build.time.sleep((build.electionTimeoutMs() + 85) - 1);
        build.client.poll();
        build.assertVotedCandidate(5, randomReplicaId);
        build.time.sleep(1L);
        build.client.poll();
        build.assertVotedCandidate(5, randomReplicaId);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(100L);
        build.client.poll();
        build.assertVotedCandidate(5 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumIgnoredAsLeaderIfOlderEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 2, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(replicaKey.id())})).withUnknownLeader(6).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.endEpochRequest(currentEpoch - 2, i, Arrays.asList(build.localReplicaKey(), replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, currentEpoch, OptionalInt.of(randomReplicaId));
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000 - 1);
        build.client.poll();
        build.assertElectedLeader(currentEpoch, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 2, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(replicaKey.id())})).withUnknownLeader(2).withKip853Rpc(z).build();
        build.deliverRequest(build.endEpochRequest(2, i, Arrays.asList(build.localReplicaKey(), replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(i));
        build.client.poll();
        build.assertVotedCandidate(2 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAccumulatorClearedAfterBecomingFollower(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.prepareAppend(currentEpoch, Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        build.deliverRequest(build.beginEpochRequest(currentEpoch + 1, i));
        build.pollUntilResponse();
        build.assertElectedLeader(currentEpoch + 1, i);
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(2))).release(allocate);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAccumulatorClearedAfterBecomingVoted(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.prepareAppend(currentEpoch, Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        build.deliverRequest(build.voteRequest(currentEpoch + 1, replicaKey, currentEpoch, build.log.endOffset().offset()));
        build.pollUntilResponse();
        build.assertVotedCandidate(currentEpoch + 1, replicaKey.id());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(2))).release(allocate);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAccumulatorClearedAfterBecomingUnattached(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.prepareAppend(currentEpoch, Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        build.deliverRequest(build.voteRequest(currentEpoch + 1, replicaKey, currentEpoch, 0L));
        build.pollUntilResponse();
        build.assertUnknownLeader(currentEpoch + 1);
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(2))).release(allocate);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withAppendLingerMs(50).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        Assertions.assertEquals(1L, build.client.prepareAppend(build.currentEpoch(), Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(50), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(20L);
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(30L), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(30L);
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withAppendLingerMs(50).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(randomReplicaId), build.currentLeader());
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.prepareAppend(currentEpoch, Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertFalse(build.messageQueue.wakeupRequested());
        Assertions.assertEquals(OptionalLong.of(50), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(50);
        Assertions.assertEquals(2L, build.client.prepareAppend(currentEpoch, Collections.singletonList("b")));
        build.client.schedulePreparedAppend();
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertEquals(3L, build.log.endOffset().offset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleEndQuorumRequest(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int randomReplicaId2 = randomReplicaId() + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId2)})).withElectedLeader(2, randomReplicaId2).withKip853Rpc(z).build();
        build.deliverRequest(build.endEpochRequest(2, randomReplicaId2, Collections.singletonList(build.localReplicaKey())));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(randomReplicaId2));
        build.client.poll();
        build.assertVotedCandidate(2 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        ReplicaKey replicaKey2 = replicaKey(randomReplicaId + 2, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())})).withElectedLeader(2, replicaKey.id()).withKip853Rpc(z).build();
        build.deliverRequest(build.endEpochRequest(2, replicaKey.id(), Arrays.asList(replicaKey2, build.localReplicaKey())));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(replicaKey.id()));
        build.time.sleep(1L);
        build.pollUntilRequest();
        build.assertSentFetchRequest(2, 0L, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50L);
        build.pollUntilRequest();
        Assertions.assertEquals(2, build.collectVoteRequests(2 + 1, 0, 0L).size());
        build.assertVotedCandidate(2 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testVoteRequestTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withKip853Rpc(z).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, randomReplicaId);
        RaftRequest.Outbound assertSentVoteRequest = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.time.sleep(build.requestTimeoutMs());
        build.client.poll();
        RaftRequest.Outbound assertSentVoteRequest2 = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.deliverResponse(assertSentVoteRequest.correlationId(), assertSentVoteRequest.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        build.client.poll();
        build.assertVotedCandidate(1, randomReplicaId);
        build.deliverResponse(assertSentVoteRequest2.correlationId(), assertSentVoteRequest2.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        build.client.poll();
        build.assertElectedLeader(1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleValidVoteRequestAsFollower(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(2).withKip853Rpc(z).build();
        build.deliverRequest(build.voteRequest(2, replicaKey, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.assertVotedCandidate(2, replicaKey.id());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleVoteRequestAsFollowerWithElectedLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        int i = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(i)})).withElectedLeader(2, i).withKip853Rpc(z).build();
        build.deliverRequest(build.voteRequest(2, replicaKey, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(i), false);
        build.assertElectedLeader(2, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleVoteRequestAsFollowerWithVotedCandidate(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        ReplicaKey replicaKey2 = replicaKey(randomReplicaId + 2, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())})).withVotedCandidate(2, replicaKey2).withKip853Rpc(z).build();
        build.deliverRequest(build.voteRequest(2, replicaKey, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        build.assertVotedCandidate(2, replicaKey2.id());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleInvalidVoteRequestWithOlderEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(2).withKip853Rpc(z).build();
        build.deliverRequest(build.voteRequest(2 - 1, replicaKey, 2 - 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, 2, OptionalInt.empty(), false);
        build.assertUnknownLeader(2);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleVoteRequestAsObserver(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(replicaKey.id()), Integer.valueOf(randomReplicaId + 2)})).withUnknownLeader(2).withKip853Rpc(z).build();
        build.deliverRequest(build.voteRequest(2 + 1, replicaKey, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2 + 1, OptionalInt.empty(), true);
        build.assertVotedCandidate(2 + 1, replicaKey.id());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderIgnoreVoteRequestOnSameEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(2).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.voteRequest(currentEpoch, replicaKey, currentEpoch - 1, 1L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.assertElectedLeader(currentEpoch, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testListenerCommitCallbackAfterLeaderWrite(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.client.poll();
        Assertions.assertEquals(OptionalLong.empty(), build.client.highWatermark());
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        List asList = Arrays.asList("a", "b", "c");
        long prepareAppend = build.client.prepareAppend(currentEpoch, asList);
        build.client.schedulePreparedAppend();
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(0L), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 500));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        Assertions.assertEquals(OptionalLong.of(0L), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 4L, currentEpoch, 500));
        build.pollUntil(() -> {
            return build.client.highWatermark().equals(OptionalLong.of(4L));
        });
        Assertions.assertEquals(asList, build.listener.commitWithLastOffset(prepareAppend));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderImmediatelySendsDivergingEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(5).withKip853Rpc(z).appendToLog(1, Arrays.asList("a", "b", "c")).appendToLog(3, Arrays.asList("d", "e", "f")).appendToLog(5, Arrays.asList("g", "h", "i")).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 6L, 2, 500));
        build.client.poll();
        FetchResponseData.PartitionData assertSentFetchPartitionResponse = build.assertSentFetchPartitionResponse();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(assertSentFetchPartitionResponse.errorCode()));
        Assertions.assertEquals(currentEpoch, assertSentFetchPartitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals(randomReplicaId, assertSentFetchPartitionResponse.currentLeader().leaderId());
        Assertions.assertEquals(1, assertSentFetchPartitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals(3L, assertSentFetchPartitionResponse.divergingEpoch().endOffset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCandidateIgnoreVoteRequestOnSameEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withVotedCandidate(2, ReplicaKey.of(randomReplicaId, ReplicaKey.NO_DIRECTORY_ID)).withKip853Rpc(z).build();
        build.pollUntilRequest();
        build.deliverRequest(build.voteRequest(2, replicaKey, 2 - 1, 1L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        build.assertVotedCandidate(2, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testRetryElection(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = 85;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(i2);
        }).withKip853Rpc(z).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, randomReplicaId);
        RaftRequest.Outbound assertSentVoteRequest = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.deliverResponse(assertSentVoteRequest.correlationId(), assertSentVoteRequest.destination(), build.voteResponse(false, OptionalInt.empty(), 1));
        build.client.poll();
        build.assertVotedCandidate(1, randomReplicaId);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(100 - 1);
        build.client.poll();
        build.assertVotedCandidate(1, randomReplicaId);
        build.time.sleep(1L);
        build.client.poll();
        build.pollUntilRequest();
        build.assertVotedCandidate(1 + 1, randomReplicaId);
        build.assertSentVoteRequest(1 + 1, 0, 0L, 1);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsFollowerEmptyLog(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 0L, 0);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeAsFollowerNonEmptyLog(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).appendToLog(3, Collections.singletonList("foo")).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 1L, 3);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testVoterBecomeCandidateAfterFetchTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).appendToLog(3, Collections.singletonList("foo")).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 1L, 3);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        build.assertSentVoteRequest(5 + 1, 3, 1L, 1);
        build.assertVotedCandidate(5 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerAsObserverDoesNotBecomeCandidateAfterFetchTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(i)})).withElectedLeader(5, i).appendToLog(3, Collections.singletonList("foo")).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 1L, 3);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.deliverRequest(build.voteRequest(5 + 1, replicaKey(i, z), 5, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 5 + 1, OptionalInt.empty(), true);
        Assertions.assertTrue(build.client.quorum().isUnattached());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testUnattachedAsObserverDoesNotBecomeCandidateAfterElectionTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(i)})).withUnknownLeader(5).withKip853Rpc(z).build();
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 0L, 0);
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.assertSentFetchRequest(5, 0L, 0);
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
        build.deliverRequest(build.voteRequest(5 + 1, replicaKey(i, z), 5, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 5 + 1, OptionalInt.empty(), true);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.assertSentFetchRequest(5 + 1, 0L, 0);
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withUnknownLeader(5).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().isVoter());
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i2, MemoryRecords.EMPTY, 0L, assertSentFetchRequest.destination().id() == i ? Errors.NOT_LEADER_OR_FOLLOWER : Errors.NONE));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInitializeObserverNoPreviousState(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        MatcherAssert.assertThat(Integer.valueOf(assertSentFetchRequest.destination().id()), Matchers.is(Matchers.in(mkSet)));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverQuorumDiscoveryFailure(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(-1, -1, MemoryRecords.EMPTY, -1L, Errors.UNKNOWN_SERVER_ERROR));
        build.client.poll();
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50L);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(i, assertSentFetchRequest2.destination().id());
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverHandleRetryFetchtToBootstrapServer(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertFalse(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest2.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest3 = build.assertSentFetchRequest();
        Assertions.assertFalse(mkSet.contains(Integer.valueOf(assertSentFetchRequest3.destination().id())));
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest3.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest3, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        build.deliverResponse(assertSentFetchRequest3.correlationId(), assertSentFetchRequest3.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverHandleRetryFetchToLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertFalse(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest2.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest3 = build.assertSentFetchRequest();
        Assertions.assertFalse(mkSet.contains(Integer.valueOf(assertSentFetchRequest3.destination().id())));
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest3.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest3, 5, 0L, 0);
        build.client.poll();
        Assertions.assertFalse(build.channel.hasSentRequests());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInvalidFetchRequest(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, -5L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 0L, -1, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 0L, currentEpoch + 1, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch + 1, replicaKey, 0L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.UNKNOWN_LEADER_EPOCH, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 0L, 0, -1));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
    }

    private static Stream<Short> validFetchVersions() {
        return Stream.iterate(13, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit((17 - 13) + 1).map((v0) -> {
            return v0.shortValue();
        });
    }

    @MethodSource({"validFetchVersions"})
    @ParameterizedTest
    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short s) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey = replicaKey(i, false);
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(randomReplicaId, Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())}), 5);
        initializeAsLeader.client.poll();
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.client.highWatermark());
        Assertions.assertEquals(1L, initializeAsLeader.log.endOffset().offset());
        FetchRequestData fetchRequest = initializeAsLeader.fetchRequest(5, replicaKey, 1L, 5, 0);
        FetchRequestData data = new FetchRequest.SimpleBuilder(fetchRequest).build(s).data();
        Assertions.assertEquals(s < 15 ? i : -1, fetchRequest.replicaId());
        Assertions.assertEquals(s < 15 ? -1 : i, fetchRequest.replicaState().replicaId());
        initializeAsLeader.deliverRequest(data, s);
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(randomReplicaId));
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.client.highWatermark());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchRequestClusterIdValidation(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, -5L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch, null, replicaKey, -5L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.fetchRequest(currentEpoch, "", replicaKey, -5L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.fetchRequest(currentEpoch, "invalid-uuid", replicaKey, -5L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testVoteRequestClusterIdValidation(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.voteRequest(currentEpoch, replicaKey, 0, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest(null, currentEpoch, replicaKey, 0, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest("", currentEpoch, replicaKey, 0, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.voteRequest("invalid-uuid", currentEpoch, replicaKey, 0, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testInvalidVoterReplicaVoteRequest() throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(true).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.voteRequest(build.clusterId.toString(), currentEpoch + 1, replicaKey, ReplicaKey.of(10, Uuid.randomUuid()), currentEpoch, 100L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, currentEpoch + 1, OptionalInt.empty(), false);
        build.deliverRequest(build.voteRequest(build.clusterId.toString(), currentEpoch + 2, replicaKey, ReplicaKey.of(0, Uuid.randomUuid()), currentEpoch, 100L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, currentEpoch + 2, OptionalInt.empty(), false);
    }

    @Test
    public void testInvalidVoterReplicaBeginQuorumEpochRequest() throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withUnknownLeader(5 - 1).withKip853Rpc(true).build();
        build.assertUnknownLeader(5 - 1);
        build.deliverRequest(build.beginEpochRequest(build.clusterId.toString(), 5, i2, ReplicaKey.of(10, Uuid.randomUuid())));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.INVALID_VOTER_KEY, 5, OptionalInt.of(i2));
        build.assertElectedLeader(5, i2);
        build.deliverRequest(build.beginEpochRequest(build.clusterId.toString(), 5, i2, ReplicaKey.of(randomReplicaId, Uuid.randomUuid())));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.INVALID_VOTER_KEY, 5, OptionalInt.of(i2));
        build.assertElectedLeader(5, i2);
        build.deliverRequest(build.beginEpochRequest(build.clusterId.toString(), 5, i2, build.localReplicaKey()));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, 5, OptionalInt.of(i2));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testBeginQuorumEpochRequestClusterIdValidation(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.beginEpochRequest(build.clusterId.toString(), currentEpoch, randomReplicaId));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.beginEpochRequest(currentEpoch, randomReplicaId));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.beginEpochRequest("", currentEpoch, randomReplicaId));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.beginEpochRequest("invalid-uuid", currentEpoch, randomReplicaId));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumEpochRequestClusterIdValidation(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.endEpochRequest(currentEpoch, randomReplicaId, Collections.singletonList(replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.endEpochRequest(null, currentEpoch, randomReplicaId, Collections.singletonList(replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.endEpochRequest("", currentEpoch, randomReplicaId, Collections.singletonList(replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.endEpochRequest("invalid-uuid", currentEpoch, randomReplicaId, Collections.singletonList(replicaKey)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderAcceptVoteFromObserver(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 2, z);
        build.deliverRequest(build.voteRequest(currentEpoch - 1, replicaKey, 0, 0L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest(currentEpoch, replicaKey, 0, 0L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInvalidVoteRequest(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withElectedLeader(5, replicaKey.id()).withKip853Rpc(z).build();
        build.assertElectedLeader(5, replicaKey.id());
        build.deliverRequest(build.voteRequest(5 + 1, replicaKey, 0, -5L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey.id()), false);
        build.assertElectedLeader(5, replicaKey.id());
        build.deliverRequest(build.voteRequest(5 + 1, replicaKey, -1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey.id()), false);
        build.assertElectedLeader(5, replicaKey.id());
        build.deliverRequest(build.voteRequest(5 + 1, replicaKey, 5 + 1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey.id()), false);
        build.assertElectedLeader(5, replicaKey.id());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPurgatoryFetchTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 500));
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
        build.time.sleep(500);
        build.client.poll();
        Assertions.assertEquals(0, build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId)).sizeInBytes());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPurgatoryFetchSatisfiedByWrite(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 500));
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
        String[] strArr = {"a", "b", "c"};
        build.client.prepareAppend(currentEpoch, Arrays.asList(strArr));
        build.client.schedulePreparedAppend();
        build.client.poll();
        RaftClientTestContext.assertMatchingRecords(strArr, build.assertSentFetchPartitionResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId)));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPurgatoryFetchCompletedByFollowerTransition(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        int i = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(i)})).withUnknownLeader(4).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 500));
        build.client.poll();
        Assertions.assertTrue(build.channel.drainSendQueue().stream().noneMatch(outbound -> {
            return outbound.data() instanceof FetchResponseData;
        }));
        build.deliverRequest(build.beginEpochRequest(currentEpoch + 1, i));
        build.pollUntilResponse();
        build.assertElectedLeader(currentEpoch + 1, i);
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, currentEpoch + 1, OptionalInt.of(i));
        Assertions.assertEquals(0, build.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, currentEpoch + 1, OptionalInt.of(i)).sizeInBytes());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchResponseIgnoredAfterBecomingCandidate(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.client.poll();
        build.assertVotedCandidate(5 + 1, randomReplicaId);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertVotedCandidate(5 + 1, randomReplicaId);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverRequest(build.beginEpochRequest(5 + 1, i2));
        build.client.poll();
        build.assertElectedLeader(5 + 1, i2);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        build.assertElectedLeader(5 + 1, i2);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testVoteResponseIgnoredAfterBecomingFollower(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i), Integer.valueOf(i2)})).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.assertUnknownLeader(5 - 1);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertVotedCandidate(5, randomReplicaId);
        List<RaftRequest.Outbound> collectVoteRequests = build.collectVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectVoteRequests.size());
        build.deliverRequest(build.beginEpochRequest(5, i2));
        build.client.poll();
        build.assertElectedLeader(5, i2);
        build.deliverResponse(collectVoteRequests.get(0).correlationId(), collectVoteRequests.get(0).destination(), build.voteResponse(false, OptionalInt.empty(), 5));
        build.deliverResponse(collectVoteRequests.get(1).correlationId(), collectVoteRequests.get(1).destination(), build.voteResponse(false, OptionalInt.of(i2), 5));
        build.client.poll();
        build.assertElectedLeader(5, i2);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).withElectedLeader(5, i).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, -1, MemoryRecords.EMPTY, -1L, Errors.BROKER_NOT_AVAILABLE));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(i, assertSentFetchRequest2.destination().id());
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).withElectedLeader(5, i).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(i, assertSentFetchRequest2.destination().id());
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.discoverLeaderAsObserver(i, 5);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, -1, MemoryRecords.EMPTY, -1L, Errors.BROKER_NOT_AVAILABLE));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(i, assertSentFetchRequest2.destination().id());
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.NOT_LEADER_OR_FOLLOWER));
        build.client.poll();
        build.assertElectedLeader(5, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverLeaderRediscoveryAfterRequestTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(i), Integer.valueOf(randomReplicaId + 2)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.discoverLeaderAsObserver(i, 5);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(i, assertSentFetchRequest.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(i, assertSentFetchRequest2.destination().id());
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderGracefulShutdown(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isShuttingDown());
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.isShuttingDown());
        Assertions.assertTrue(build.client.isRunning());
        build.assertSentEndQuorumEpochRequest(1, replicaKey.id());
        build.deliverRequest(build.voteRequest(currentEpoch + 1, replicaKey, currentEpoch, 1L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch + 1, OptionalInt.empty(), true);
        build.deliverRequest(build.beginEpochRequest(2, replicaKey.id()));
        TestUtils.waitForCondition(() -> {
            build.client.poll();
            return !build.client.isRunning();
        }, 5000L, "Client failed to shutdown before expiration of timeout");
        Assertions.assertFalse(build.client.isShuttingDown());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEndQuorumEpochSentBasedOnFetchOffset(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 2, z);
        ReplicaKey replicaKey2 = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(1, replicaKey2, 1L, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(1L, currentEpoch);
        build.client.prepareAppend(currentEpoch, Arrays.asList("foo", "bar"));
        build.client.schedulePreparedAppend();
        build.client.poll();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 3L, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(3L, currentEpoch);
        build.client.shutdown(build.electionTimeoutMs() * 2);
        Assertions.assertTrue(build.client.isRunning());
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.isRunning());
        build.collectEndQuorumRequests(currentEpoch, Utils.mkSet(new Integer[]{Integer.valueOf(replicaKey.id()), Integer.valueOf(replicaKey2.id())}), Optional.of(Arrays.asList(replicaKey(replicaKey.id(), false), replicaKey(replicaKey2.id(), false))));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testDescribeQuorumNonLeader(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey(randomReplicaId + 1, z).id()), Integer.valueOf(replicaKey(randomReplicaId + 2, z).id())})).withUnknownLeader(2).build();
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        DescribeQuorumResponseData collectDescribeQuorumResponse = build.collectDescribeQuorumResponse();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(collectDescribeQuorumResponse.errorCode()));
        Assertions.assertEquals("", collectDescribeQuorumResponse.errorMessage());
        Assertions.assertEquals(1, collectDescribeQuorumResponse.topics().size());
        DescribeQuorumResponseData.TopicData topicData = (DescribeQuorumResponseData.TopicData) collectDescribeQuorumResponse.topics().get(0);
        Assertions.assertEquals(build.metadataPartition.topic(), topicData.topicName());
        Assertions.assertEquals(1, topicData.partitions().size());
        DescribeQuorumResponseData.PartitionData partitionData = (DescribeQuorumResponseData.PartitionData) topicData.partitions().get(0);
        Assertions.assertEquals(build.metadataPartition.partition(), partitionData.partitionIndex());
        Assertions.assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, Errors.forCode(partitionData.errorCode()));
        Assertions.assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testDescribeQuorumWithOnlyStaticVoters(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Uuid) replicaKey.directoryId().get()).withStaticVoters(Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey2.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, -1L, Arrays.asList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(randomReplicaId).setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(1L).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds()), new DescribeQuorumResponseData.ReplicaState().setReplicaId(replicaKey2.id()).setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(-1L).setLastFetchTimestamp(-1L).setLastCaughtUpTimestamp(-1L)), Collections.emptyList());
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, false"})
    public void testDescribeQuorumWithFollowers(boolean z, boolean z2) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        int i2 = randomReplicaId + 2;
        ReplicaKey replicaKey = replicaKey(randomReplicaId, z2);
        Uuid uuid = (Uuid) replicaKey.directoryId().orElse(Uuid.randomUuid());
        ReplicaKey replicaKey2 = replicaKey(i, z2);
        ReplicaKey of = ReplicaKey.of(i, (Uuid) replicaKey2.directoryId().orElse(z ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID));
        ReplicaKey replicaKey3 = replicaKey(i2, z2);
        ReplicaKey of2 = ReplicaKey.of(i2, (Uuid) replicaKey3.directoryId().orElse(z ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID));
        RaftClientTestContext.Builder withKip853Rpc = new RaftClientTestContext.Builder(randomReplicaId, uuid).withKip853Rpc(z);
        if (z2) {
            withKip853Rpc.withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3}))));
        } else {
            withKip853Rpc.withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, of, of2})));
        }
        RaftClientTestContext build = withKip853Rpc.build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        DescribeQuorumResponseData.ReplicaState[] replicaStateArr = new DescribeQuorumResponseData.ReplicaState[3];
        replicaStateArr[0] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(randomReplicaId).setReplicaDirectoryId(z2 ? (Uuid) build.localReplicaKey().directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(z2 ? 3L : 1L).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        replicaStateArr[1] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(i).setReplicaDirectoryId(z2 ? (Uuid) of.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(-1L).setLastFetchTimestamp(-1L).setLastCaughtUpTimestamp(-1L);
        replicaStateArr[2] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(i2).setReplicaDirectoryId(z2 ? (Uuid) of2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(-1L).setLastFetchTimestamp(-1L).setLastCaughtUpTimestamp(-1L);
        List<DescribeQuorumResponseData.ReplicaState> asList = Arrays.asList(replicaStateArr);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, -1L, asList, Collections.emptyList());
        build.time.sleep(100L);
        long j = z2 ? 3L : 1L;
        long milliseconds = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(1, of, j, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        long size = j + r0.size();
        build.client.prepareAppend(currentEpoch, Arrays.asList("foo", "bar"));
        build.client.schedulePreparedAppend();
        build.client.poll();
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        asList.get(0).setLogEndOffset(size).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        asList.get(1).setLogEndOffset(j).setLastFetchTimestamp(milliseconds).setLastCaughtUpTimestamp(milliseconds);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, Collections.emptyList());
        build.time.sleep(100L);
        long milliseconds2 = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(currentEpoch, of2, size, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(size, currentEpoch);
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        asList.get(0).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        asList.get(2).setLogEndOffset(size).setLastFetchTimestamp(milliseconds2).setLastCaughtUpTimestamp(milliseconds2);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, size, asList, Collections.emptyList());
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(75000L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        build.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 0L, Collections.emptyList(), Collections.emptyList());
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, false"})
    public void testDescribeQuorumWithObserver(boolean z, boolean z2) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey = replicaKey(randomReplicaId, z2);
        Uuid uuid = (Uuid) replicaKey.directoryId().orElse(Uuid.randomUuid());
        ReplicaKey replicaKey2 = replicaKey(i, z2);
        Uuid uuid2 = (Uuid) replicaKey2.directoryId().orElse(z ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID);
        ReplicaKey of = ReplicaKey.of(i, uuid2);
        RaftClientTestContext.Builder withKip853Rpc = new RaftClientTestContext.Builder(randomReplicaId, uuid).withKip853Rpc(z);
        if (z2) {
            withKip853Rpc.withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))));
        } else {
            withKip853Rpc.withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, of})));
        }
        RaftClientTestContext build = withKip853Rpc.build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.time.sleep(100L);
        long j = z2 ? 3L : 1L;
        long milliseconds = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(1, of, j, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        ReplicaKey replicaKey3 = replicaKey(randomReplicaId + 2, z);
        Uuid uuid3 = (Uuid) replicaKey3.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID);
        build.time.sleep(100L);
        long milliseconds2 = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, 0L, 0, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        DescribeQuorumResponseData.ReplicaState[] replicaStateArr = new DescribeQuorumResponseData.ReplicaState[2];
        replicaStateArr[0] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(randomReplicaId).setReplicaDirectoryId(z2 ? uuid : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(j).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        replicaStateArr[1] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(of.id()).setReplicaDirectoryId(z2 ? uuid2 : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(j).setLastFetchTimestamp(milliseconds).setLastCaughtUpTimestamp(milliseconds);
        List<DescribeQuorumResponseData.ReplicaState> asList = Arrays.asList(replicaStateArr);
        List<DescribeQuorumResponseData.ReplicaState> singletonList = Collections.singletonList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(replicaKey3.id()).setReplicaDirectoryId(uuid3).setLogEndOffset(0L).setLastFetchTimestamp(milliseconds2).setLastCaughtUpTimestamp(-1L));
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, singletonList);
        build.time.sleep(100L);
        long milliseconds3 = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey3, j, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        asList.get(0).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        singletonList.get(0).setLogEndOffset(j).setLastFetchTimestamp(milliseconds3).setLastCaughtUpTimestamp(milliseconds3);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, singletonList);
        build.time.sleep(100L);
        build.client.prepareAppend(currentEpoch, Arrays.asList("foo", "bar"));
        build.client.schedulePreparedAppend();
        build.client.poll();
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        asList.get(0).setLogEndOffset(j + r0.size()).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, singletonList);
        long j2 = 300000;
        while (true) {
            long j3 = j2;
            if (j3 <= 0) {
                build.deliverRequest(build.describeQuorumRequest());
                build.pollUntilResponse();
                asList.get(0).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
                asList.get(1).setLastFetchTimestamp(milliseconds);
                build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, Collections.emptyList());
                build.deliverRequest(build.fetchRequest(currentEpoch, ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
                build.pollUntilResponse();
                build.assertSentFetchPartitionResponse(j, currentEpoch);
                build.deliverRequest(build.describeQuorumRequest());
                build.pollUntilResponse();
                asList.get(0).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
                build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, Collections.emptyList());
                return;
            }
            milliseconds = build.time.milliseconds();
            build.deliverRequest(build.fetchRequest(currentEpoch, of, j, currentEpoch, 0));
            build.pollUntilResponse();
            build.assertSentFetchPartitionResponse(j, currentEpoch);
            MockTime mockTime = build.time;
            build.getClass();
            mockTime.sleep(75000 - 1);
            build.getClass();
            j2 = j3 - (75000 - 1);
        }
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, false"})
    public void testDescribeQuorumNonMonotonicFollowerFetch(boolean z, boolean z2) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId, z2);
        Uuid uuid = (Uuid) replicaKey.directoryId().orElse(Uuid.randomUuid());
        int i = randomReplicaId + 1;
        ReplicaKey replicaKey2 = replicaKey(i, z2);
        ReplicaKey of = ReplicaKey.of(i, (Uuid) replicaKey2.directoryId().orElse(z ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID));
        RaftClientTestContext.Builder withKip853Rpc = new RaftClientTestContext.Builder(randomReplicaId, uuid).withKip853Rpc(z);
        if (z2) {
            withKip853Rpc.withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2}))));
        } else {
            withKip853Rpc.withStaticVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, of})));
        }
        RaftClientTestContext build = withKip853Rpc.build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.time.sleep(100L);
        build.client.prepareAppend(currentEpoch, Arrays.asList("foo", "bar"));
        build.client.schedulePreparedAppend();
        build.client.poll();
        long j = z2 ? 5L : 3L;
        long milliseconds = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(currentEpoch, of, j, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        DescribeQuorumResponseData.ReplicaState[] replicaStateArr = new DescribeQuorumResponseData.ReplicaState[2];
        replicaStateArr[0] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(randomReplicaId).setReplicaDirectoryId(z2 ? (Uuid) replicaKey.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(j).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        replicaStateArr[1] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(of.id()).setReplicaDirectoryId(z2 ? (Uuid) of.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(j).setLastFetchTimestamp(milliseconds).setLastCaughtUpTimestamp(milliseconds);
        List<DescribeQuorumResponseData.ReplicaState> asList = Arrays.asList(replicaStateArr);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, Collections.emptyList());
        build.time.sleep(100L);
        long milliseconds2 = build.time.milliseconds();
        build.deliverRequest(build.fetchRequest(currentEpoch, of, j - 1, currentEpoch, 0));
        build.pollUntilResponse();
        build.assertSentFetchPartitionResponse(j, currentEpoch);
        build.time.sleep(100L);
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        asList.get(0).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        asList.get(1).setLogEndOffset(j - r0.size()).setLastFetchTimestamp(milliseconds2);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, j, asList, Collections.emptyList());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Uuid) replicaKey.directoryId().get()).withStaticVoters(Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey2.id())})).withKip853Rpc(z).withBootstrapSnapshot(Optional.of(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})))).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.describeQuorumRequest());
        build.pollUntilResponse();
        DescribeQuorumResponseData.ReplicaState[] replicaStateArr = new DescribeQuorumResponseData.ReplicaState[3];
        replicaStateArr[0] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(randomReplicaId).setReplicaDirectoryId(z ? (Uuid) replicaKey.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(3L).setLastFetchTimestamp(build.time.milliseconds()).setLastCaughtUpTimestamp(build.time.milliseconds());
        replicaStateArr[1] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(replicaKey2.id()).setReplicaDirectoryId(z ? (Uuid) replicaKey2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(-1L).setLastFetchTimestamp(-1L).setLastCaughtUpTimestamp(-1L);
        replicaStateArr[2] = new DescribeQuorumResponseData.ReplicaState().setReplicaId(replicaKey3.id()).setReplicaDirectoryId(z ? (Uuid) replicaKey3.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID).setLogEndOffset(-1L).setLastFetchTimestamp(-1L).setLastCaughtUpTimestamp(-1L);
        build.assertSentDescribeQuorumResponse(randomReplicaId, currentEpoch, -1L, Arrays.asList(replicaStateArr), Collections.emptyList());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderGracefulShutdownTimeout(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withUnknownLeader(1).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.isRunning());
        build.assertSentEndQuorumEpochRequest(currentEpoch, i);
        build.time.sleep(5000);
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertTrue(shutdown.isCompletedExceptionally());
        TestUtils.assertFutureThrows(shutdown, TimeoutException.class);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerGracefulShutdown(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.client.poll();
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverGracefulShutdown(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId + 1), Integer.valueOf(randomReplicaId + 2)})).withUnknownLeader(5).withKip853Rpc(z).build();
        build.client.poll();
        build.assertUnknownLeader(5);
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testGracefulShutdownSingleMemberQuorum(boolean z) throws IOException {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Collections.singleton(Integer.valueOf(randomReplicaId))).withKip853Rpc(z).build();
        build.assertElectedLeader(1, randomReplicaId);
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
        build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerReplication(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
        Assertions.assertEquals(2L, build.log.firstUnflushedOffset());
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, true", "false, false"})
    public void testObserverReplication(boolean z, boolean z2) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).withAlwaysFlush(z2).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
        Assertions.assertEquals(z2 ? 2L : 0L, build.log.firstUnflushedOffset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testEmptyRecordSetInFetchResponse(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset());
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        build.pollUntilRequest();
        MemoryRecords buildBatch = build.buildBatch(0L, 5, Arrays.asList("a", "b"));
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, buildBatch, 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest3 = build.assertSentFetchRequest(5, 2L, 5);
        build.deliverResponse(assertSentFetchRequest3.correlationId(), assertSentFetchRequest3.destination(), build.fetchResponse(5, i, MemoryRecords.EMPTY, 2L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
        Assertions.assertEquals(OptionalLong.of(2L), build.client.highWatermark());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchShouldBeTreatedAsLeaderAcknowledgement(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.time.sleep(build.electionTimeoutMs());
        build.expectAndGrantVotes(5);
        build.pollUntilRequest();
        build.assertSentBeginQuorumEpochRequest(5, Utils.mkSet(new Integer[]{Integer.valueOf(replicaKey.id())}));
        build.deliverRequest(build.fetchRequest(5, replicaKey, 0L, 0, 500));
        build.client.poll();
        build.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(randomReplicaId));
        build.time.sleep(build.requestTimeoutMs());
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeaderAppendSingleMemberQuorum(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Collections.singleton(Integer.valueOf(randomReplicaId))).withKip853Rpc(z).build();
        long milliseconds = build.time.milliseconds();
        build.pollUntil(() -> {
            return build.log.endOffset().offset() == 1;
        });
        build.assertElectedLeader(1, randomReplicaId);
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        String[] strArr = {"a", "b", "c"};
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        build.client.prepareAppend(build.currentEpoch(), Arrays.asList(strArr));
        build.client.schedulePreparedAppend();
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(4L), build.client.highWatermark());
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        ArrayList arrayList = new ArrayList(2);
        boolean z2 = true;
        while (z2) {
            long j = 0;
            int i = 0;
            if (!arrayList.isEmpty()) {
                MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) arrayList.get(arrayList.size() - 1);
                j = mutableRecordBatch.lastOffset() + 1;
                i = mutableRecordBatch.partitionLeaderEpoch();
            }
            build.deliverRequest(build.fetchRequest(1, replicaKey, j, i, 0));
            build.pollUntilResponse();
            List list = Utils.toList(build.assertSentFetchPartitionResponse(Errors.NONE, 1, OptionalInt.of(randomReplicaId)).batchIterator());
            arrayList.addAll(list);
            z2 = !list.isEmpty();
        }
        Assertions.assertEquals(2, arrayList.size());
        MutableRecordBatch mutableRecordBatch2 = (MutableRecordBatch) arrayList.get(0);
        Assertions.assertTrue(mutableRecordBatch2.isControlBatch());
        List list2 = Utils.toList(mutableRecordBatch2.iterator());
        Assertions.assertEquals(1, list2.size());
        Record record = (Record) list2.get(0);
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(randomReplicaId, Collections.singletonList(Integer.valueOf(randomReplicaId)), Collections.singletonList(Integer.valueOf(randomReplicaId)), record.key(), record.value());
        MutableRecordBatch mutableRecordBatch3 = (MutableRecordBatch) arrayList.get(1);
        Assertions.assertEquals(1, mutableRecordBatch3.partitionLeaderEpoch());
        List list3 = Utils.toList(mutableRecordBatch3.iterator());
        Assertions.assertEquals(3, list3.size());
        for (int i2 = 0; i2 < strArr.length; i2++) {
            Assertions.assertEquals(strArr[i2], Utils.utf8(((Record) list3.get(i2)).value()));
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFollowerLogReconciliation(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(5, i).appendToLog(3, Arrays.asList("foo", "bar")).appendToLog(3, Collections.singletonList("baz")).withKip853Rpc(z).build();
        build.assertElectedLeader(5, i);
        Assertions.assertEquals(3L, build.log.endOffset().offset());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 3L, 3);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.divergingFetchResponse(5, i, 2L, 3, 1L));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset());
        Assertions.assertEquals(2L, build.log.firstUnflushedOffset());
        build.client.poll();
        build.assertSentFetchRequest(5, 2L, 3);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetrics(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Collections.singleton(Integer.valueOf(randomReplicaId))).withKip853Rpc(z).build();
        build.pollUntil(() -> {
            return build.log.endOffset().offset() == 1;
        });
        Assertions.assertNotNull(getMetric(build.metrics, "current-state"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-leader"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-vote"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-epoch"));
        Assertions.assertNotNull(getMetric(build.metrics, "high-watermark"));
        Assertions.assertNotNull(getMetric(build.metrics, "log-end-offset"));
        Assertions.assertNotNull(getMetric(build.metrics, "log-end-epoch"));
        Assertions.assertNotNull(getMetric(build.metrics, "number-unknown-voter-connections"));
        Assertions.assertNotNull(getMetric(build.metrics, "poll-idle-ratio-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "commit-latency-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "commit-latency-max"));
        Assertions.assertNotNull(getMetric(build.metrics, "election-latency-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "election-latency-max"));
        Assertions.assertNotNull(getMetric(build.metrics, "fetch-records-rate"));
        Assertions.assertNotNull(getMetric(build.metrics, "append-records-rate"));
        Assertions.assertEquals("leader", getMetric(build.metrics, "current-state").metricValue());
        Assertions.assertEquals(Double.valueOf(randomReplicaId), getMetric(build.metrics, "current-leader").metricValue());
        Assertions.assertEquals(Double.valueOf(randomReplicaId), getMetric(build.metrics, "current-vote").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "current-epoch").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(build.metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(build.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "log-end-epoch").metricValue());
        build.client.prepareAppend(1, Arrays.asList("a", "b", "c"));
        build.client.schedulePreparedAppend();
        build.client.poll();
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(build.metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(build.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "log-end-epoch").metricValue());
        build.client.close();
        Assertions.assertEquals(1, build.metrics.metrics().size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterAuthorizationFailedInFetch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withKip853Rpc(z).withElectedLeader(5, i).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), new FetchResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).updateRandom(mockableRandom -> {
            mockableRandom.mockNextInt(10000, 0);
        }).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.time.sleep(build.electionTimeoutMs());
        build.expectAndGrantVotes(5);
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectBeginEpochRequests = build.collectBeginEpochRequests(5);
        Assertions.assertEquals(1, collectBeginEpochRequests.size());
        RaftRequest.Outbound outbound = collectBeginEpochRequests.get(0);
        Assertions.assertEquals(i, outbound.destination().id());
        build.deliverResponse(outbound.correlationId(), outbound.destination(), new BeginQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterAuthorizationFailedInVote(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertVotedCandidate(5, randomReplicaId);
        RaftRequest.Outbound assertSentVoteRequest = build.assertSentVoteRequest(5, 0, 0L, 1);
        build.deliverResponse(assertSentVoteRequest.correlationId(), assertSentVoteRequest.destination(), new VoteResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterAuthorizationFailedInEndQuorumEpoch(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withUnknownLeader(1).withKip853Rpc(z).build();
        build.becomeLeader();
        int currentEpoch = build.currentEpoch();
        build.client.shutdown(5000);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentEndQuorumEpochRequest = build.assertSentEndQuorumEpochRequest(currentEpoch, i);
        build.deliverResponse(assertSentEndQuorumEpochRequest.correlationId(), assertSentEndQuorumEpochRequest.destination(), new EndQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).withKip853Rpc(z).build();
        build.becomeLeader();
        build.client.poll();
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.log.endOffset().offset());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 0L, 0, 0));
        build.client.poll();
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(currentEpoch, replicaKey, 1L, currentEpoch, 0));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(0L), build.listener.lastCommitOffset());
        build.client.poll();
        Assertions.assertEquals(OptionalInt.of(currentEpoch), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(0L, build.listener.claimedEpochStartOffset(currentEpoch));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffset(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())});
        List<String> asList = Arrays.asList("1", "2", "3");
        List<String> asList2 = Arrays.asList("4", "5", "6");
        List<String> asList3 = Arrays.asList("7", "8", "9");
        List asList4 = Arrays.asList(asList, asList2, asList3);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).appendToLog(1, asList).appendToLog(1, asList2).appendToLog(2, asList3).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.becomeLeader();
        build.client.poll();
        Assertions.assertEquals(10L, build.log.endOffset().offset());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(5, replicaKey, 3L, 1, 500));
        build.client.poll();
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(5, replicaKey, 10L, 5, 500));
        build.pollUntil(() -> {
            int i = 0;
            for (Batch<String> batch : build.listener.committedBatches()) {
                if (i < asList4.size()) {
                    Assertions.assertEquals(asList4.get(i), batch.records());
                }
                i++;
            }
            Assertions.assertEquals(4, i);
            return build.listener.currentClaimedEpoch().isPresent();
        });
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.of(9L), build.listener.lastCommitOffset());
        Assertions.assertEquals(9L, build.listener.claimedEpochStartOffset(5));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLateRegisteredListenerCatchesUp(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        int i = 5;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())});
        List<String> asList = Arrays.asList("1", "2", "3");
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).appendToLog(1, asList).appendToLog(1, Arrays.asList("4", "5", "6")).appendToLog(2, Arrays.asList("7", "8", "9")).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.becomeLeader();
        build.client.poll();
        Assertions.assertEquals(10L, build.log.endOffset().offset());
        build.deliverRequest(build.fetchRequest(5, replicaKey, 10L, 5, 0));
        build.pollUntil(() -> {
            return OptionalInt.of(i).equals(build.listener.currentClaimedEpoch());
        });
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        Assertions.assertEquals(OptionalLong.of(9L), build.listener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(9L, build.listener.claimedEpochStartOffset(5));
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.pollUntil(() -> {
            return OptionalInt.of(i).equals(mockListener.currentClaimedEpoch());
        });
        Assertions.assertEquals(OptionalLong.of(9L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(9L, mockListener.claimedEpochStartOffset(5));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testReregistrationChangesListenerContext(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)});
        List<String> asList = Arrays.asList("1", "2", "3");
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).appendToLog(1, asList).appendToLog(1, Arrays.asList("4", "5", "6")).appendToLog(2, Arrays.asList("7", "8", "9")).withUnknownLeader(5 - 1).withKip853Rpc(z).build();
        build.becomeLeader();
        build.client.poll();
        Assertions.assertEquals(10L, build.log.endOffset().offset());
        build.advanceLocalLeaderHighWatermarkToLogEndOffset();
        build.pollUntil(() -> {
            return OptionalLong.of(9L).equals(build.listener.lastCommitOffset());
        });
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.pollUntil(() -> {
            return OptionalLong.of(9L).equals(mockListener.lastCommitOffset());
        });
        build.client.unregister(mockListener);
        Assertions.assertEquals(10L, build.client.prepareAppend(5, Collections.singletonList("a")));
        build.client.schedulePreparedAppend();
        build.client.poll();
        build.advanceLocalLeaderHighWatermarkToLogEndOffset();
        build.pollUntil(() -> {
            return OptionalLong.of(10L).equals(build.listener.lastCommitOffset());
        });
        Assertions.assertEquals(OptionalLong.of(9L), mockListener.lastCommitOffset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) mkSet).withElectedLeader(5, i).withKip853Rpc(z).build();
        Assertions.assertEquals(OptionalLong.empty(), build.client.highWatermark());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        List<String> asList = Arrays.asList("a", "b", "c");
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, i, build.buildBatch(0L, 3, asList), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        Assertions.assertEquals(0, build.listener.numCommittedBatches());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 3L, 3);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, i, build.buildBatch(3L, 3, Arrays.asList("d", "e", "f")), 3L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(3L), build.client.highWatermark());
        Assertions.assertEquals(1, build.listener.numCommittedBatches());
        Assertions.assertEquals(OptionalLong.of(2L), build.listener.lastCommitOffset());
        Assertions.assertEquals(asList, build.listener.lastCommit().records());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleCommitCallbackFiresInVotedState(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(7 - 1).withKip853Rpc(z).build();
        build.becomeLeader();
        build.deliverRequest(build.fetchRequest(7, replicaKey, 10L, 7, 500));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        int i = 7 + 1;
        build.deliverRequest(build.voteRequest(i, replicaKey, 7, 10L));
        build.pollUntilResponse();
        build.assertVotedCandidate(i, replicaKey.id());
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.client.poll();
        build.assertVotedCandidate(i, replicaKey.id());
        build.pollUntil(() -> {
            return mockListener.lastCommitOffset().equals(OptionalLong.of(9L));
        });
        Assertions.assertEquals(OptionalLong.of(9L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.empty(), mockListener.currentClaimedEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleCommitCallbackFiresInCandidateState(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        ReplicaKey replicaKey = replicaKey(randomReplicaId + 1, z);
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(replicaKey.id())})).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(7 - 1).withKip853Rpc(z).build();
        build.becomeLeader();
        Assertions.assertEquals(10L, build.log.endOffset().offset());
        build.deliverRequest(build.fetchRequest(7, replicaKey, 10L, 7, 0));
        build.pollUntilResponse();
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        build.assertSentFetchPartitionResponse(Errors.NONE, 7, OptionalInt.of(randomReplicaId));
        build.deliverRequest(build.voteRequest(7 + 1, replicaKey, 7, 9L));
        build.pollUntilResponse();
        build.assertUnknownLeader(7 + 1);
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        int i = 7 + 2;
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        build.assertVotedCandidate(i, randomReplicaId);
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.client.poll();
        build.assertVotedCandidate(i, randomReplicaId);
        build.pollUntil(() -> {
            return mockListener.lastCommitOffset().equals(OptionalLong.of(9L));
        });
        Assertions.assertEquals(OptionalLong.of(9L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.empty(), mockListener.currentClaimedEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleLeaderChangeFiresAfterUnattachedRegistration(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withUnknownLeader(7).withKip853Rpc(z).build();
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.client.poll();
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.empty(), 7), mockListener.currentLeaderAndEpoch());
        build.deliverRequest(build.beginEpochRequest(7, i));
        build.pollUntilResponse();
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(i), 7), mockListener.currentLeaderAndEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleLeaderChangeFiresAfterFollowerRegistration(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        int i = randomReplicaId + 1;
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(i)})).withElectedLeader(7, i).withKip853Rpc(z).build();
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.client.poll();
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(i), 7), mockListener.currentLeaderAndEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleLeaderChangeFiresAfterResignRegistration(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        RaftClientTestContext build = new RaftClientTestContext.Builder(randomReplicaId, (Set<Integer>) Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)})).withElectedLeader(7, randomReplicaId).withKip853Rpc(z).build();
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isResigned());
        Assertions.assertEquals(LeaderAndEpoch.UNKNOWN, build.listener.currentLeaderAndEpoch());
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener(OptionalInt.of(randomReplicaId));
        build.client.register(mockListener);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isResigned());
        Assertions.assertEquals(LeaderAndEpoch.UNKNOWN, mockListener.currentLeaderAndEpoch());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserverFetchWithNoLocalId(boolean z) throws Exception {
        int randomReplicaId = randomReplicaId();
        Set mkSet = Utils.mkSet(new Integer[]{Integer.valueOf(randomReplicaId), Integer.valueOf(randomReplicaId + 1)});
        RaftClientTestContext build = new RaftClientTestContext.Builder(OptionalInt.empty(), (Set<Integer>) mkSet).withBootstrapServers(Optional.of((List) mkSet.stream().map((v0) -> {
            return RaftClientTestContext.mockAddress(v0);
        }).collect(Collectors.toList()))).withKip853Rpc(z).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(build.bootstrapIds.contains(Integer.valueOf(assertSentFetchRequest.destination().id())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(5, randomReplicaId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, randomReplicaId);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertEquals(randomReplicaId, assertSentFetchRequest2.destination().id());
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId(), assertSentFetchRequest2.destination(), build.fetchResponse(5, randomReplicaId, build.buildBatch(0L, 3, Arrays.asList("a", "b", "c")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(3L, build.log.endOffset().offset());
        Assertions.assertEquals(3, build.log.lastFetchedEpoch());
    }

    private static KafkaMetric getMetric(Metrics metrics, String str) {
        return (KafkaMetric) metrics.metrics().get(metrics.metricName(str, "raft-metrics"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ReplicaKey replicaKey(int i, boolean z) {
        return ReplicaKey.of(i, z ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID);
    }

    public static int randomReplicaId() {
        return ThreadLocalRandom.current().nextInt(1025);
    }
}
