package org.apache.kafka.coordinator.group.runtime;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.class */
public class CoordinatorRuntimeTest {
    private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
    private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
    private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion();

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$DirectEventProcessor.class */
    private static class DirectEventProcessor implements CoordinatorEventProcessor {
        private DirectEventProcessor() {
        }

        public void enqueueLast(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            try {
                coordinatorEvent.run();
            } catch (Throwable th) {
                coordinatorEvent.complete(th);
            }
        }

        public void enqueueFirst(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            try {
                coordinatorEvent.run();
            } catch (Throwable th) {
                coordinatorEvent.complete(th);
            }
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$ManualEventProcessor.class */
    private static class ManualEventProcessor implements CoordinatorEventProcessor {
        private final Deque<CoordinatorEvent> queue;

        private ManualEventProcessor() {
            this.queue = new LinkedList();
        }

        public void enqueueLast(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            this.queue.addLast(coordinatorEvent);
        }

        public void enqueueFirst(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            this.queue.addFirst(coordinatorEvent);
        }

        public boolean poll() {
            CoordinatorEvent poll = this.queue.poll();
            if (poll == null) {
                return false;
            }
            try {
                poll.run();
                return true;
            } catch (Throwable th) {
                poll.complete(th);
                return true;
            }
        }

        public int size() {
            return this.queue.size();
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorLoader.class */
    private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
        private final CoordinatorLoader.LoadSummary summary;
        private final List<Long> lastWrittenOffsets;
        private final List<Long> lastCommittedOffsets;

        public MockCoordinatorLoader(CoordinatorLoader.LoadSummary loadSummary, List<Long> list, List<Long> list2) {
            this.summary = loadSummary;
            this.lastWrittenOffsets = list;
            this.lastCommittedOffsets = list2;
        }

        public MockCoordinatorLoader() {
            this(null, Collections.emptyList(), Collections.emptyList());
        }

        public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition topicPartition, CoordinatorPlayback<String> coordinatorPlayback) {
            List<Long> list = this.lastWrittenOffsets;
            coordinatorPlayback.getClass();
            list.forEach(coordinatorPlayback::updateLastWrittenOffset);
            List<Long> list2 = this.lastCommittedOffsets;
            coordinatorPlayback.getClass();
            list2.forEach(coordinatorPlayback::updateLastCommittedOffset);
            return CompletableFuture.completedFuture(this.summary);
        }

        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShard.class */
    public static class MockCoordinatorShard implements CoordinatorShard<String> {
        private final SnapshotRegistry snapshotRegistry;
        private final TimelineHashSet<RecordAndMetadata> records;
        private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> pendingRecords;
        private final CoordinatorTimer<Void, String> timer;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShard$RecordAndMetadata.class */
        public static class RecordAndMetadata {
            public final long offset;
            public final long producerId;
            public final short producerEpoch;
            public final String record;

            public RecordAndMetadata(long j, String str) {
                this(j, -1L, (short) -1, str);
            }

            public RecordAndMetadata(long j, long j2, short s, String str) {
                this.offset = j;
                this.producerId = j2;
                this.producerEpoch = s;
                this.record = str;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                RecordAndMetadata recordAndMetadata = (RecordAndMetadata) obj;
                if (this.offset == recordAndMetadata.offset && this.producerId == recordAndMetadata.producerId && this.producerEpoch == recordAndMetadata.producerEpoch) {
                    return Objects.equals(this.record, recordAndMetadata.record);
                }
                return false;
            }

            public int hashCode() {
                return (31 * ((31 * ((31 * ((int) (this.offset ^ (this.offset >>> 32)))) + ((int) (this.producerId ^ (this.producerId >>> 32))))) + this.producerEpoch)) + (this.record != null ? this.record.hashCode() : 0);
            }

            public String toString() {
                return "RecordAndMetadata(offset=" + this.offset + ", producerId=" + this.producerId + ", producerEpoch=" + ((int) this.producerEpoch) + ", record='" + this.record.substring(0, 10) + "')";
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MockCoordinatorShard(SnapshotRegistry snapshotRegistry, CoordinatorTimer<Void, String> coordinatorTimer) {
            this.snapshotRegistry = snapshotRegistry;
            this.records = new TimelineHashSet<>(snapshotRegistry, 0);
            this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
            this.timer = coordinatorTimer;
        }

        @Override // 
        public void replay(long j, long j2, short s, String str) throws RuntimeException {
            RecordAndMetadata recordAndMetadata = new RecordAndMetadata(j, j2, s, str);
            if (j2 == -1) {
                this.records.add(recordAndMetadata);
            } else {
                ((TimelineHashSet) this.pendingRecords.computeIfAbsent(Long.valueOf(j2), l -> {
                    return new TimelineHashSet(this.snapshotRegistry, 0);
                })).add(recordAndMetadata);
            }
        }

        public void replayEndTransactionMarker(long j, short s, TransactionResult transactionResult) throws RuntimeException {
            if (transactionResult != TransactionResult.COMMIT) {
                this.pendingRecords.remove(Long.valueOf(j));
                return;
            }
            TimelineHashSet timelineHashSet = (TimelineHashSet) this.pendingRecords.remove(Long.valueOf(j));
            if (timelineHashSet == null) {
                return;
            }
            this.records.addAll(timelineHashSet);
        }

        Set<String> pendingRecords(long j) {
            TimelineHashSet timelineHashSet = (TimelineHashSet) this.pendingRecords.get(Long.valueOf(j));
            return timelineHashSet == null ? Collections.emptySet() : Collections.unmodifiableSet((Set) timelineHashSet.stream().map(recordAndMetadata -> {
                return recordAndMetadata.record;
            }).collect(Collectors.toSet()));
        }

        Set<String> records() {
            return Collections.unmodifiableSet((Set) this.records.stream().map(recordAndMetadata -> {
                return recordAndMetadata.record;
            }).collect(Collectors.toSet()));
        }

        List<RecordAndMetadata> fullRecords() {
            return Collections.unmodifiableList((List) this.records.stream().sorted(Comparator.comparingLong(recordAndMetadata -> {
                return recordAndMetadata.offset;
            })).collect(Collectors.toList()));
        }

        CoordinatorTimer<Void, String> timer() {
            return this.timer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShardBuilder.class */
    public static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
        private SnapshotRegistry snapshotRegistry;
        private CoordinatorTimer<Void, String> timer;

        private MockCoordinatorShardBuilder() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(LogContext logContext) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(Time time) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(CoordinatorTimer<Void, String> coordinatorTimer) {
            this.timer = coordinatorTimer;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(TopicPartition topicPartition) {
            return this;
        }

        @Override // 
        /* renamed from: build */
        public MockCoordinatorShard mo4build() {
            return new MockCoordinatorShard((SnapshotRegistry) Objects.requireNonNull(this.snapshotRegistry), (CoordinatorTimer) Objects.requireNonNull(this.timer));
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShardBuilderSupplier.class */
    private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
        private MockCoordinatorShardBuilderSupplier() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
            return new MockCoordinatorShardBuilder();
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockPartitionWriter.class */
    private static class MockPartitionWriter extends InMemoryPartitionWriter {
        private final int maxWrites;
        private final boolean failEndMarker;
        private final AtomicInteger writeCount;

        public MockPartitionWriter() {
            this(Integer.MAX_VALUE, false);
        }

        public MockPartitionWriter(int i) {
            this(i, false);
        }

        public MockPartitionWriter(boolean z) {
            this(Integer.MAX_VALUE, z);
        }

        public MockPartitionWriter(int i, boolean z) {
            super(false);
            this.writeCount = new AtomicInteger(0);
            this.maxWrites = i;
            this.failEndMarker = z;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public void registerListener(TopicPartition topicPartition, PartitionWriter.Listener listener) {
            super.registerListener(topicPartition, listener);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public void deregisterListener(TopicPartition topicPartition, PartitionWriter.Listener listener) {
            super.deregisterListener(topicPartition, listener);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public long append(TopicPartition topicPartition, VerificationGuard verificationGuard, MemoryRecords memoryRecords) {
            if (memoryRecords.sizeInBytes() > config(topicPartition).maxMessageSize()) {
                throw new RecordTooLargeException("Batch is larger than the max message size");
            }
            if (this.writeCount.incrementAndGet() > this.maxWrites) {
                throw new KafkaException("Maximum number of writes reached");
            }
            if (this.failEndMarker && memoryRecords.firstBatch().isControlBatch()) {
                throw new KafkaException("Couldn't write end marker.");
            }
            return super.append(topicPartition, verificationGuard, memoryRecords);
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$StringSerializer.class */
    private static class StringSerializer implements Serializer<String> {
        private StringSerializer() {
        }

        public byte[] serializeKey(String str) {
            return null;
        }

        public byte[] serializeValue(String str) {
            return str.getBytes(Charset.defaultCharset());
        }
    }

    private static MemoryRecords records(long j, String... strArr) {
        return records(j, (List<String>) Arrays.stream(strArr).collect(Collectors.toList()));
    }

    private static MemoryRecords records(long j, List<String> list) {
        if (list.isEmpty()) {
            return MemoryRecords.EMPTY;
        }
        List list2 = (List) list.stream().map(str -> {
            return new SimpleRecord(j, str.getBytes(Charset.defaultCharset()));
        }).collect(Collectors.toList());
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordVersion.current().value, CompressionType.NONE, list2)), RecordVersion.current().value, Compression.NONE, TimestampType.CREATE_TIME, 0L, j, -1L, (short) -1, 0, false, -1);
        builder.getClass();
        list2.forEach(builder::append);
        return builder.build();
    }

    private static MemoryRecords transactionalRecords(long j, short s, long j2, String... strArr) {
        return transactionalRecords(j, s, j2, (List<String>) Arrays.stream(strArr).collect(Collectors.toList()));
    }

    private static MemoryRecords transactionalRecords(long j, short s, long j2, List<String> list) {
        if (list.isEmpty()) {
            return MemoryRecords.EMPTY;
        }
        List list2 = (List) list.stream().map(str -> {
            return new SimpleRecord(j2, str.getBytes(Charset.defaultCharset()));
        }).collect(Collectors.toList());
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordVersion.current().value, CompressionType.NONE, list2)), RecordVersion.current().value, Compression.NONE, TimestampType.CREATE_TIME, 0L, j2, j, s, 0, true, -1);
        builder.getClass();
        list2.forEach(builder::append);
        return builder.build();
    }

    private static MemoryRecords endTransactionMarker(long j, short s, long j2, int i, ControlRecordType controlRecordType) {
        return MemoryRecords.withEndTransactionMarker(j2, j, s, new EndTransactionMarker(controlRecordType, i));
    }

    @Test
    public void testScheduleLoading() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture);
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator.coordinator());
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(0, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator.coordinator());
        completableFuture.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onLoaded(MetadataImage.EMPTY);
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).registerListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.eq(contextOrThrow.coordinator.snapshotRegistry()));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withLogContext((LogContext) ArgumentMatchers.eq(contextOrThrow.logContext));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withTime((Time) ArgumentMatchers.eq(mockTimer.time()));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withTimer((CoordinatorTimer) ArgumentMatchers.eq(contextOrThrow.timer));
    }

    @Test
    public void testScheduleLoadingWithFailure() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(0, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator.coordinator());
        completableFuture.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
    }

    @Test
    public void testScheduleLoadingWithStalePartitionEpoch() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator.coordinator());
        completableFuture.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleLoadOperation(TP, 0);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
    }

    @Test
    public void testScheduleLoadingAfterLoadingFailure() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator.coordinator());
        completableFuture.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        MockCoordinatorShard mockCoordinatorShard2 = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard2);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture2);
        build.scheduleLoadOperation(TP, 11);
        CoordinatorRuntime.CoordinatorContext contextOrThrow2 = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow2.state);
        Assertions.assertEquals(11, contextOrThrow2.epoch);
        Assertions.assertEquals(mockCoordinatorShard2, contextOrThrow2.coordinator.coordinator());
        completableFuture2.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow2.state);
    }

    @Test
    public void testScheduleUnloading() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleUnloadOperation(TP, OptionalInt.of(contextOrThrow.epoch + 1));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).deregisterListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
    }

    @Test
    public void testScheduleUnloadingWithEmptyEpoch() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleUnloadOperation(TP, OptionalInt.empty());
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).deregisterListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
    }

    @Test
    public void testScheduleUnloadingWhenContextDoesntExist() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleUnloadOperation(TP, OptionalInt.of(11));
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(0))).onUnloaded();
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
    }

    @Test
    public void testScheduleUnloadingWithStalePartitionEpoch() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleUnloadOperation(TP, OptionalInt.of(0));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
    }

    @Test
    public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), "record1", "record2")), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList("record3"), "response2");
        });
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record1", "record2"), records(mockTimer.time().milliseconds(), "record3")), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard3 -> {
            return new CoordinatorResult(Collections.emptyList(), "response3");
        });
        Assertions.assertFalse(scheduleWriteOperation3.isDone());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record1", "record2"), records(mockTimer.time().milliseconds(), "record3")), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertEquals("response1", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(2L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        mockPartitionWriter.commit(TP, 3L);
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
        Assertions.assertTrue(scheduleWriteOperation3.isDone());
        Assertions.assertEquals("response2", scheduleWriteOperation2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response3", scheduleWriteOperation3.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleWriteOperation4 = build.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard4 -> {
            return new CoordinatorResult(Collections.emptyList(), "response4");
        });
        Assertions.assertTrue(scheduleWriteOperation4.isDone());
        Assertions.assertEquals("response4", scheduleWriteOperation4.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(Collections.singletonList(3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenInactive() {
        MockTimer mockTimer = new MockTimer();
        TestUtils.assertFutureThrows(new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build().scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.emptyList(), "response1");
        }), NotCoordinatorException.class);
    }

    @Test
    public void testScheduleWriteOpWhenOpFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            throw new KafkaException("error");
        }), KafkaException.class);
    }

    @Test
    public void testScheduleWriteOpWhenReplayFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = contextOrThrow.coordinator.snapshotRegistry();
        contextOrThrow.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, new MockCoordinatorShard(snapshotRegistry, contextOrThrow.timer) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.1
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard
            public void replay(long j, long j2, short s, String str) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        }), IllegalArgumentException.class);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenWriteFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter(1)).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4", "record5"), "response2");
        }), KafkaException.class);
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3L), mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        mockTimer.advanceClock(4L);
        TestUtils.assertFutureThrows(scheduleWriteOperation, org.apache.kafka.common.errors.TimeoutException.class);
    }

    @Test
    public void testScheduleWriteAllOperation() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", 0);
        TopicPartition topicPartition2 = new TopicPartition("__consumer_offsets", 1);
        TopicPartition topicPartition3 = new TopicPartition("__consumer_offsets", 2);
        build.scheduleLoadOperation(topicPartition, 10);
        build.scheduleLoadOperation(topicPartition2, 10);
        build.scheduleLoadOperation(topicPartition3, 10);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List scheduleWriteAllOperation = build.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            int andIncrement = atomicInteger.getAndIncrement();
            return new CoordinatorResult(Collections.singletonList("record#" + andIncrement), Collections.singletonList("response#" + andIncrement));
        });
        Assertions.assertEquals(1L, build.contextOrThrow(topicPartition).coordinator.lastWrittenOffset());
        Assertions.assertEquals(1L, build.contextOrThrow(topicPartition2).coordinator.lastWrittenOffset());
        Assertions.assertEquals(1L, build.contextOrThrow(topicPartition3).coordinator.lastWrittenOffset());
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), "record#0")), mockPartitionWriter.entries(topicPartition));
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), "record#1")), mockPartitionWriter.entries(topicPartition2));
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), "record#2")), mockPartitionWriter.entries(topicPartition3));
        mockPartitionWriter.commit(topicPartition);
        mockPartitionWriter.commit(topicPartition2);
        mockPartitionWriter.commit(topicPartition3);
        Assertions.assertEquals(Arrays.asList("response#0", "response#1", "response#2"), FutureUtils.combineFutures(scheduleWriteAllOperation, ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        }).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleTransactionalWriteOp() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder mockCoordinatorShardBuilder = new MockCoordinatorShardBuilder() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShardBuilder
            /* renamed from: build, reason: merged with bridge method [inline-methods] */
            public MockCoordinatorShard mo4build() {
                return mockCoordinatorShard;
            }
        };
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShardBuilderSupplier
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return mockCoordinatorShardBuilder;
            }
        }).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).registerListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any());
        Mockito.when(mockPartitionWriter.config(TP)).thenReturn(new LogConfig(Collections.emptyMap()));
        VerificationGuard verificationGuard = new VerificationGuard();
        Mockito.when(mockPartitionWriter.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short) 50, TXN_OFFSET_COMMIT_LATEST_VERSION)).thenReturn(CompletableFuture.completedFuture(verificationGuard));
        build.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short) 50, Duration.ofMillis(5000L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).append((TopicPartition) ArgumentMatchers.eq(TP), (VerificationGuard) ArgumentMatchers.eq(verificationGuard), (MemoryRecords) ArgumentMatchers.eq(transactionalRecords(100L, (short) 50, mockTimer.time().milliseconds(), "record1", "record2")));
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).replay(ArgumentMatchers.eq(0L), ArgumentMatchers.eq(100L), ArgumentMatchers.eq((short) 50), (String) ArgumentMatchers.eq("record1"));
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).replay(ArgumentMatchers.eq(1L), ArgumentMatchers.eq(100L), ArgumentMatchers.eq((short) 50), (String) ArgumentMatchers.eq("record2"));
    }

    @Test
    public void testScheduleTransactionalWriteOpWhenVerificationFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder mockCoordinatorShardBuilder = new MockCoordinatorShardBuilder() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShardBuilder
            /* renamed from: build */
            public MockCoordinatorShard mo4build() {
                return mockCoordinatorShard;
            }
        };
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShardBuilderSupplier
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return mockCoordinatorShardBuilder;
            }
        }).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).registerListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any());
        Mockito.when(mockPartitionWriter.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short) 50, TXN_OFFSET_COMMIT_LATEST_VERSION)).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception()));
        TestUtils.assertFutureThrows(build.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short) 50, Duration.ofMillis(5000L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION)), NotEnoughReplicasException.class);
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(0))).append((TopicPartition) ArgumentMatchers.any(), (VerificationGuard) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any());
    }

    @EnumSource(TransactionResult.class)
    @ParameterizedTest
    public void testScheduleTransactionCompletion(TransactionResult transactionResult) throws ExecutionException, InterruptedException, TimeoutException {
        ControlRecordType controlRecordType;
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleTransactionalWriteOperation = build.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertFalse(scheduleTransactionalWriteOperation.isDone());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.singletonList(transactionalRecords(100L, (short) 5, mockTimer.time().milliseconds(), "record1", "record2")), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleTransactionCompletion = build.scheduleTransactionCompletion("complete#1", TP, 100L, (short) 5, 10, transactionResult, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse(scheduleTransactionCompletion.isDone());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        if (transactionResult == TransactionResult.COMMIT) {
            Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
            controlRecordType = ControlRecordType.COMMIT;
        } else {
            Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
            controlRecordType = ControlRecordType.ABORT;
        }
        Assertions.assertEquals(Arrays.asList(transactionalRecords(100L, (short) 5, mockTimer.time().milliseconds(), "record1", "record2"), endTransactionMarker(100L, (short) 5, mockTimer.time().milliseconds(), 10, controlRecordType)), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertTrue(scheduleTransactionalWriteOperation.isDone());
        Assertions.assertEquals("response1", scheduleTransactionalWriteOperation.get(5L, TimeUnit.SECONDS));
        mockPartitionWriter.commit(TP, 3L);
        Assertions.assertTrue(scheduleTransactionCompletion.isDone());
        Assertions.assertNull(scheduleTransactionCompletion.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteTimesOut() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleTransactionCompletion = build.scheduleTransactionCompletion("complete#1", TP, 100L, (short) 5, 10, TransactionResult.COMMIT, Duration.ofMillis(3L));
        Assertions.assertEquals(1L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        mockTimer.advanceClock(4L);
        TestUtils.assertFutureThrows(scheduleTransactionCompletion, org.apache.kafka.common.errors.TimeoutException.class);
        Assertions.assertEquals(1L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter(true)).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        build.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        TestUtils.assertFutureThrows(build.scheduleTransactionCompletion("complete#1", TP, 100L, (short) 5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT), KafkaException.class);
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleTransactionCompletionWhenReplayFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = contextOrThrow.coordinator.snapshotRegistry();
        contextOrThrow.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, new MockCoordinatorShard(snapshotRegistry, contextOrThrow.timer) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.6
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard
            public void replayEndTransactionMarker(long j, short s, TransactionResult transactionResult) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        build.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Collections.singletonList(transactionalRecords(100L, (short) 5, mockTimer.time().milliseconds(), "record1", "record2")), mockPartitionWriter.entries(TP));
        TestUtils.assertFutureThrows(build.scheduleTransactionCompletion("complete#1", TP, 100L, (short) 5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT), IllegalArgumentException.class);
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Collections.singletonList(transactionalRecords(100L, (short) 5, mockTimer.time().milliseconds(), "record1", "record2")), mockPartitionWriter.entries(TP));
    }

    @Test
    public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(4L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastCommittedOffset());
        CompletableFuture scheduleReadOperation = build.scheduleReadOperation("read", TP, (mockCoordinatorShard3, j) -> {
            Assertions.assertEquals(contextOrThrow.coordinator.lastCommittedOffset(), j);
            return "read-response";
        });
        Assertions.assertTrue(scheduleReadOperation.isDone());
        Assertions.assertEquals("read-response", scheduleReadOperation.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleReadOpWhenPartitionInactive() {
        MockTimer mockTimer = new MockTimer();
        TestUtils.assertFutureThrows(new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build().scheduleReadOperation("read", TP, (mockCoordinatorShard, j) -> {
            return "read-response";
        }), NotCoordinatorException.class);
    }

    @Test
    public void testScheduleReadOpWhenOpsFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        mockPartitionWriter.commit(TP, 2L);
        TestUtils.assertFutureThrows(build.scheduleReadOperation("read", TP, (mockCoordinatorShard3, j) -> {
            Assertions.assertEquals(contextOrThrow.coordinator.lastCommittedOffset(), j);
            throw new IllegalArgumentException("error");
        }), IllegalArgumentException.class);
    }

    @Test
    public void testScheduleReadAllOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", 0);
        TopicPartition topicPartition2 = new TopicPartition("__consumer_offsets", 1);
        TopicPartition topicPartition3 = new TopicPartition("__consumer_offsets", 2);
        build.scheduleLoadOperation(topicPartition, 10);
        build.scheduleLoadOperation(topicPartition2, 10);
        build.scheduleLoadOperation(topicPartition3, 10);
        build.scheduleWriteOperation("write#0", topicPartition, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.singletonList("record0"), "response0");
        });
        build.scheduleWriteOperation("write#1", topicPartition2, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList("record1"), "response1");
        });
        build.scheduleWriteOperation("write#2", topicPartition3, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard3 -> {
            return new CoordinatorResult(Collections.singletonList("record2"), "response2");
        });
        mockPartitionWriter.commit(topicPartition);
        mockPartitionWriter.commit(topicPartition2);
        mockPartitionWriter.commit(topicPartition3);
        Assertions.assertEquals(Arrays.asList("record0", "record1", "record2"), FutureUtils.combineFutures(build.scheduleReadAllOperation("read", (mockCoordinatorShard4, j) -> {
            return new ArrayList(mockCoordinatorShard4.records());
        }), ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        }).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testClose() throws Exception {
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.spy(new MockCoordinatorLoader());
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.SECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record5", "record6"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        build.close();
        TestUtils.assertFutureThrows(scheduleWriteOperation, NotCoordinatorException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, NotCoordinatorException.class);
        ((MockCoordinatorLoader) Mockito.verify(mockCoordinatorLoader)).close();
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testOnNewMetadataImage() {
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", 0);
        TopicPartition topicPartition2 = new TopicPartition("__consumer_offsets", 1);
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        MockCoordinatorShard mockCoordinatorShard2 = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard).thenReturn(mockCoordinatorShard2);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(topicPartition), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, topicPartition)))).thenReturn(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(topicPartition2), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, topicPartition2)))).thenReturn(completableFuture2);
        build.scheduleLoadOperation(topicPartition, 0);
        build.scheduleLoadOperation(topicPartition2, 0);
        Assertions.assertEquals(mockCoordinatorShard, build.contextOrThrow(topicPartition).coordinator.coordinator());
        Assertions.assertEquals(mockCoordinatorShard2, build.contextOrThrow(topicPartition2).coordinator.coordinator());
        completableFuture.complete(null);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard)).onLoaded(MetadataImage.EMPTY);
        MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
        MetadataImage apply = metadataDelta.apply(MetadataProvenance.EMPTY);
        build.onNewMetadataImage(apply, metadataDelta);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard)).onNewMetadataImage(apply, metadataDelta);
        completableFuture2.complete(null);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard2)).onLoaded(apply);
    }

    @Test
    public void testScheduleTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(30L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), (Object) null);
        });
        contextOrThrow.timer.schedule("timer-2", 20L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), (Object) null);
        });
        Assertions.assertEquals(2, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3", "record4"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testRescheduleTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        Assertions.assertEquals(0, manualEventProcessor.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record1"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record2"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record3"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(2, manualEventProcessor.size());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testCancelTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        Assertions.assertEquals(0, manualEventProcessor.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record1"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record2"), (Object) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        contextOrThrow.timer.cancel("timer-1");
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testRetryableTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(3, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        contextOrThrow.timer.cancel("timer-1");
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testRetryableTimerWithCustomBackoff() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, 1000L, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(3, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        contextOrThrow.timer.cancel("timer-1");
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testNonRetryableTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        contextOrThrow.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testTimerScheduleIfAbsent() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        contextOrThrow.timer.scheduleIfAbsent("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(5L);
        contextOrThrow.timer.scheduleIfAbsent("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        mockTimer.advanceClock(6L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testStateChanges() throws Exception {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics groupCoordinatorRuntimeMetrics = (GroupCoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics(groupCoordinatorRuntimeMetrics).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, TP)))).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        completableFuture.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, contextOrThrow.state);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.FAILED);
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", 1);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load((TopicPartition) ArgumentMatchers.eq(topicPartition), (CoordinatorPlayback) ArgumentMatchers.argThat(coordinatorMatcher(build, topicPartition)))).thenReturn(completableFuture2);
        build.scheduleLoadOperation(topicPartition, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow2 = build.contextOrThrow(topicPartition);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow2.coordinator.coordinator());
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow2.state);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(2))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        completableFuture2.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow2.state);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.ACTIVE);
        build.close();
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.FAILED, CoordinatorRuntime.CoordinatorState.CLOSED);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.ACTIVE, CoordinatorRuntime.CoordinatorState.CLOSED);
    }

    @Test
    public void testPartitionLoadSensor() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics groupCoordinatorRuntimeMetrics = (GroupCoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        long milliseconds = mockTimer.time().milliseconds();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(milliseconds, milliseconds + 1000, milliseconds + 500, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics(groupCoordinatorRuntimeMetrics).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
        build.scheduleLoadOperation(TP, 0);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, build.contextOrThrow(TP).state);
        ((GroupCoordinatorRuntimeMetrics) Mockito.verify(groupCoordinatorRuntimeMetrics, Mockito.times(1))).recordPartitionLoadSensor(milliseconds, milliseconds + 1000);
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), Arrays.asList(5L, 15L, 27L), Arrays.asList(5L, 15L))).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((GroupCoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(27L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(15L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertFalse(contextOrThrow.coordinator.snapshotRegistry().hasSnapshot(0L));
        Assertions.assertFalse(contextOrThrow.coordinator.snapshotRegistry().hasSnapshot(5L));
        Assertions.assertTrue(contextOrThrow.coordinator.snapshotRegistry().hasSnapshot(15L));
        Assertions.assertTrue(contextOrThrow.coordinator.snapshotRegistry().hasSnapshot(27L));
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).withCoordinatorRuntimeMetrics((GroupCoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withCoordinatorMetrics((CoordinatorMetrics) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.mo4build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertTrue(contextOrThrow.coordinator.snapshotRegistry().hasSnapshot(0L));
    }

    @Test
    public void testHighWatermarkUpdate() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.singletonList("record1"), "response1");
        });
        manualEventProcessor.poll();
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList("record2"), "response2");
        });
        manualEventProcessor.poll();
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record1"), records(mockTimer.time().milliseconds(), "record2")), mockPartitionWriter.entries(TP));
        Assertions.assertEquals(-1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        mockPartitionWriter.commit(TP, 1L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertEquals(1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertEquals(2L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        manualEventProcessor.poll();
        Assertions.assertEquals(-1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals(2L, build.contextOrThrow(TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
    }

    @Test
    public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.singletonList("record1"), "response1");
        });
        manualEventProcessor.poll();
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList("record2"), "response2");
        });
        manualEventProcessor.poll();
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record1"), records(mockTimer.time().milliseconds(), "record2")), mockPartitionWriter.entries(TP));
        Assertions.assertEquals(2, mockTimer.size());
        mockPartitionWriter.commit(TP, 1L);
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertEquals(2L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals(2, mockTimer.size());
        mockTimer.taskQueue().forEach(timerTaskEntry -> {
            Assertions.assertFalse(timerTaskEntry.cancelled());
        });
        manualEventProcessor.poll();
        Assertions.assertEquals(-1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals(2L, build.contextOrThrow(TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(2, mockTimer.size());
        mockTimer.taskQueue().forEach(timerTaskEntry2 -> {
            Assertions.assertTrue(timerTaskEntry2.cancelled());
        });
    }

    @Test
    public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CompletableFuture scheduleTransactionCompletion = build.scheduleTransactionCompletion("transactional-write", TP, 100L, (short) 50, 1, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        manualEventProcessor.poll();
        Assertions.assertEquals(Collections.singletonList(endTransactionMarker(100L, (short) 50, mockTimer.time().milliseconds(), 1, ControlRecordType.COMMIT)), mockPartitionWriter.entries(TP));
        Assertions.assertEquals(1, mockTimer.size());
        mockPartitionWriter.commit(TP, 1L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertEquals(1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals(1, mockTimer.size());
        mockTimer.taskQueue().forEach(timerTaskEntry -> {
            Assertions.assertFalse(timerTaskEntry.cancelled());
        });
        manualEventProcessor.poll();
        Assertions.assertEquals(-1L, build.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals(1L, build.contextOrThrow(TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue(scheduleTransactionCompletion.isDone());
        Assertions.assertEquals(1, mockTimer.size());
        mockTimer.taskQueue().forEach(timerTaskEntry2 -> {
            Assertions.assertTrue(timerTaskEntry2.cancelled());
        });
    }

    @Test
    public void testAppendRecordBatchSize() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        Assertions.assertTrue(maxMessageSize > 16384);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3000; i++) {
            arrayList.add("record-" + i);
        }
        Assertions.assertFalse(build.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, mockCoordinatorShard -> {
            return new CoordinatorResult(arrayList, "response1");
        }).isCompletedExceptionally());
        int sizeInBytes = mockPartitionWriter.entries(TP).get(0).sizeInBytes();
        Assertions.assertTrue(sizeInBytes > 16384 && sizeInBytes < maxMessageSize);
    }

    @Test
    public void testScheduleWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list.subList(0, 2), "response1");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertNotNull(contextOrThrow.currentBatch);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list.subList(2, 3), "response2");
        });
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), mockCoordinatorShard3 -> {
            return new CoordinatorResult(list.subList(3, 4), "response3");
        });
        Assertions.assertFalse(scheduleWriteOperation3.isDone());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String) list.get(3))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), (List<String>) list.subList(0, 3))), mockPartitionWriter.entries(TP));
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(4L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 3L, 4L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String) list.get(3))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds() - 11, (List<String>) list.subList(0, 3)), records(mockTimer.time().milliseconds() - 11, (List<String>) list.subList(3, 4))), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
        Assertions.assertTrue(scheduleWriteOperation3.isDone());
        Assertions.assertEquals("response1", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response2", scheduleWriteOperation2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response3", scheduleWriteOperation3.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list, "response1");
        }), RecordTooLargeException.class);
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter(0);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list.subList(0, 1), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list.subList(1, 2), "response2");
        });
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), mockCoordinatorShard3 -> {
            return new CoordinatorResult(list.subList(2, 3), "response3");
        });
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation4 = build.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), mockCoordinatorShard4 -> {
            return new CoordinatorResult(list.subList(3, 4), "response4");
        });
        TestUtils.assertFutureThrows(scheduleWriteOperation, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation3, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation4, KafkaException.class);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        SnapshotRegistry snapshotRegistry = contextOrThrow.coordinator.snapshotRegistry();
        contextOrThrow.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, new MockCoordinatorShard(snapshotRegistry, contextOrThrow.timer) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.7
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard
            public void replay(long j, long j2, short s, String str) throws RuntimeException {
                if (j >= 1) {
                    throw new IllegalArgumentException("error");
                }
                super.replay(j, j2, s, str);
            }
        }, TP);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list.subList(0, 1), "response1");
        });
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.singletonList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list.subList(1, 2), "response2");
        });
        TestUtils.assertFutureThrows(scheduleWriteOperation, IllegalArgumentException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, IllegalArgumentException.class);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
    }

    @Test
    public void testScheduleTransactionalWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.singletonList("record#1"), "response1");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#1"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleTransactionalWriteOperation = build.scheduleTransactionalWriteOperation("txn-write#1", TP, "transactional-id", 100L, (short) 50, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList("record#2"), "response2");
        }, Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertFalse(scheduleTransactionalWriteOperation.isDone());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#1"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, mockTimer.time().milliseconds(), "record#2")), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), mockCoordinatorShard3 -> {
            return new CoordinatorResult(Collections.singletonList("record#3"), "response3");
        });
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L, 2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#2"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#1", "record#3"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, mockTimer.time().milliseconds(), "record#2")), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleTransactionCompletion = build.scheduleTransactionCompletion("complete#1", TP, 100L, (short) 50, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse(scheduleTransactionCompletion.isDone());
        Assertions.assertEquals(4L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Utils.mkSet(new String[]{"record#1", "record#2", "record#3"}), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, mockTimer.time().milliseconds(), "record#2"), records(mockTimer.time().milliseconds(), "record#3"), endTransactionMarker(100L, (short) 50, mockTimer.time().milliseconds(), 10, ControlRecordType.COMMIT)), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertTrue(scheduleTransactionalWriteOperation.isDone());
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
        Assertions.assertTrue(scheduleTransactionCompletion.isDone());
        Assertions.assertEquals("response1", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response2", scheduleTransactionalWriteOperation.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response3", scheduleWriteOperation2.get(5L, TimeUnit.SECONDS));
        Assertions.assertNull(scheduleTransactionCompletion.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testStateMachineIsReloadedWhenOutOfSync() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.spy(new MockCoordinatorLoader());
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.8
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockPartitionWriter, org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
            public long append(TopicPartition topicPartition, VerificationGuard verificationGuard, MemoryRecords memoryRecords) {
                return super.append(topicPartition, verificationGuard, memoryRecords) + 1;
            }
        };
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        SnapshottableCoordinator snapshottableCoordinator = contextOrThrow.coordinator;
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list.subList(0, 1), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list.subList(1, 2), "response2");
        });
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), mockCoordinatorShard3 -> {
            return new CoordinatorResult(list.subList(2, 3), "response3");
        });
        CompletableFuture scheduleWriteOperation4 = build.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), mockCoordinatorShard4 -> {
            return new CoordinatorResult(list.subList(3, 4), "response4");
        });
        TestUtils.assertFutureThrows(scheduleWriteOperation, NotCoordinatorException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, NotCoordinatorException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation3, NotCoordinatorException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation4, NotCoordinatorException.class);
        ((MockCoordinatorLoader) Mockito.verify(mockCoordinatorLoader, Mockito.times(2))).load((TopicPartition) ArgumentMatchers.eq(TP), (CoordinatorPlayback) ArgumentMatchers.any());
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertNotEquals(snapshottableCoordinator, contextOrThrow.coordinator);
    }

    @Test
    public void testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new CoordinatorLoader<String>() { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.9
            public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition topicPartition, CoordinatorPlayback<String> coordinatorPlayback) {
                coordinatorPlayback.replay(0L, -1L, (short) -1, "record#0");
                coordinatorPlayback.replay(0L, -1L, (short) -1, "record#1");
                coordinatorPlayback.updateLastWrittenOffset(2L);
                coordinatorPlayback.updateLastCommittedOffset(1L);
                return CompletableFuture.completedFuture(new CoordinatorLoader.LoadSummary(0L, 0L, 0L, 2L, 1L));
            }

            public void close() {
            }
        }).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(1L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.emptyList(), "response1");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        contextOrThrow.highWatermarklistener.onHighWatermarkUpdated(TP, 2L);
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(2L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(2L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals("response1", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list, "write#1");
        }), RecordTooLargeException.class);
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list, "write#2", (CompletableFuture) null, true, false);
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertNotNull(contextOrThrow.currentBatch);
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String) list.get(3))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds(), (List<String>) list.subList(0, 3))), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP, 3L);
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        mockTimer.advanceClock(11L);
        Assertions.assertNull(contextOrThrow.currentBatch);
        Assertions.assertEquals(4L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(3L, 4L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String) list.get(3))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Arrays.asList(records(mockTimer.time().milliseconds() - 11, (List<String>) list.subList(0, 3)), records(mockTimer.time().milliseconds() - 11, (List<String>) list.subList(3, 4))), mockPartitionWriter.entries(TP));
        mockPartitionWriter.commit(TP, 4L);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertEquals("write#2", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        char[] cArr = new char[maxMessageSize];
        Arrays.fill(cArr, '4');
        String str = new String(cArr);
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list, "write#1", (CompletableFuture) null, true, false);
        });
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(Collections.singletonList(str), "write#2", (CompletableFuture) null, true, false);
        });
        mockTimer.advanceClock(11L);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, RecordTooLargeException.class);
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertEquals(3L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 3L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.singletonList(records(mockTimer.time().milliseconds() - 11, (List<String>) list.subList(0, 3))), mockPartitionWriter.entries(TP));
    }

    @Test
    public void testScheduleNonAtomicWriteOperationWhenWriteFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter(0);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics) Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).withSerializer(new StringSerializer()).withAppendLingerMs(10).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull(contextOrThrow.currentBatch);
        int maxMessageSize = mockPartitionWriter.config(TP).maxMessageSize();
        List list = (List) Stream.of((Object[]) new Character[]{'1', '2', '3', '4'}).map(ch -> {
            char[] cArr = new char[maxMessageSize / 4];
            Arrays.fill(cArr, ch.charValue());
            return new String(cArr);
        }).collect(Collectors.toList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), mockCoordinatorShard -> {
            return new CoordinatorResult(list.subList(0, 1), "response1", (CompletableFuture) null, true, false);
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), mockCoordinatorShard2 -> {
            return new CoordinatorResult(list.subList(1, 2), "response2", (CompletableFuture) null, true, false);
        });
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), mockCoordinatorShard3 -> {
            return new CoordinatorResult(list.subList(2, 3), "response3", (CompletableFuture) null, true, false);
        });
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Arrays.asList(new MockCoordinatorShard.RecordAndMetadata(0L, (String) list.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String) list.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String) list.get(2))), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
        CompletableFuture scheduleWriteOperation4 = build.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), mockCoordinatorShard4 -> {
            return new CoordinatorResult(list.subList(3, 4), "response4", (CompletableFuture) null, true, false);
        });
        TestUtils.assertFutureThrows(scheduleWriteOperation, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation3, KafkaException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation4, KafkaException.class);
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastWrittenOffset());
        Assertions.assertEquals(0L, contextOrThrow.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard) contextOrThrow.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), mockPartitionWriter.entries(TP));
    }

    private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(CoordinatorRuntime<S, U> coordinatorRuntime, TopicPartition topicPartition) {
        return coordinatorPlayback -> {
            return coordinatorPlayback.equals(coordinatorRuntime.contextOrThrow(topicPartition).coordinator);
        };
    }
}
