package org.apache.kafka.metalog;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager.class */
public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, AutoCloseable {
    private final Logger log;
    private final int nodeId;
    private final SharedLogData shared;
    private final EventQueue eventQueue;
    private long maxReadOffset;
    private boolean initialized = false;
    private boolean shutdown = false;
    private final Map<RaftClient.Listener<ApiMessageAndVersion>, MetaLogListenerData> listeners = new IdentityHashMap();
    private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
    private AtomicBoolean resignAfterNonAtomicCommit = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager$LeaderChangeBatch.class */
    public static class LeaderChangeBatch implements LocalBatch {
        private final LeaderAndEpoch newLeader;

        public LeaderChangeBatch(LeaderAndEpoch leaderAndEpoch) {
            this.newLeader = leaderAndEpoch;
        }

        @Override // org.apache.kafka.metalog.LocalLogManager.LocalBatch
        public int epoch() {
            return this.newLeader.epoch();
        }

        @Override // org.apache.kafka.metalog.LocalLogManager.LocalBatch
        public int size() {
            return 1;
        }

        public boolean equals(Object obj) {
            return (obj instanceof LeaderChangeBatch) && ((LeaderChangeBatch) obj).newLeader.equals(this.newLeader);
        }

        public int hashCode() {
            return Objects.hash(this.newLeader);
        }

        public String toString() {
            return "LeaderChangeBatch(newLeader=" + this.newLeader + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager$LocalBatch.class */
    public interface LocalBatch {
        int epoch();

        int size();
    }

    /* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager$LocalRecordBatch.class */
    public static class LocalRecordBatch implements LocalBatch {
        private final int leaderEpoch;
        private final long appendTimestamp;
        private final List<ApiMessageAndVersion> records;

        public LocalRecordBatch(int i, long j, List<ApiMessageAndVersion> list) {
            this.leaderEpoch = i;
            this.appendTimestamp = j;
            this.records = list;
        }

        @Override // org.apache.kafka.metalog.LocalLogManager.LocalBatch
        public int epoch() {
            return this.leaderEpoch;
        }

        @Override // org.apache.kafka.metalog.LocalLogManager.LocalBatch
        public int size() {
            return this.records.size();
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof LocalRecordBatch)) {
                return false;
            }
            LocalRecordBatch localRecordBatch = (LocalRecordBatch) obj;
            return this.leaderEpoch == localRecordBatch.leaderEpoch && this.appendTimestamp == localRecordBatch.appendTimestamp && Objects.equals(this.records, localRecordBatch.records);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.leaderEpoch), Long.valueOf(this.appendTimestamp), this.records);
        }

        public String toString() {
            return String.format("LocalRecordBatch(leaderEpoch=%s, appendTimestamp=%s, records=%s)", Integer.valueOf(this.leaderEpoch), Long.valueOf(this.appendTimestamp), this.records);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager$MetaLogListenerData.class */
    public static class MetaLogListenerData {
        private long offset = -1;
        private LeaderAndEpoch notifiedLeader = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private final RaftClient.Listener<ApiMessageAndVersion> listener;

        MetaLogListenerData(RaftClient.Listener<ApiMessageAndVersion> listener) {
            this.listener = listener;
        }

        long offset() {
            return this.offset;
        }

        void setOffset(long j) {
            this.offset = j;
        }

        LeaderAndEpoch notifiedLeader() {
            return this.notifiedLeader;
        }

        void handleCommit(MemoryBatchReader<ApiMessageAndVersion> memoryBatchReader) {
            this.listener.handleCommit(memoryBatchReader);
            this.offset = memoryBatchReader.lastOffset().getAsLong();
        }

        void handleSnapshot(SnapshotReader<ApiMessageAndVersion> snapshotReader) {
            this.listener.handleSnapshot(snapshotReader);
            this.offset = snapshotReader.lastContainedLogOffset();
        }

        void handleLeaderChange(long j, LeaderAndEpoch leaderAndEpoch) {
            this.listener.handleLeaderChange(leaderAndEpoch);
            this.notifiedLeader = leaderAndEpoch;
            this.offset = j;
        }

        void beginShutdown() {
            this.listener.beginShutdown();
        }
    }

    /* loaded from: input_file:org/apache/kafka/metalog/LocalLogManager$SharedLogData.class */
    public static class SharedLogData {
        private long prevOffset;
        private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
        private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
        private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
        private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private long initialMaxReadOffset = Long.MAX_VALUE;
        private NavigableMap<Long, RawSnapshotReader> snapshots = new TreeMap();

        public SharedLogData(Optional<RawSnapshotReader> optional) {
            if (!optional.isPresent()) {
                this.prevOffset = -1L;
                return;
            }
            RawSnapshotReader rawSnapshotReader = optional.get();
            this.prevOffset = rawSnapshotReader.snapshotId().offset - 1;
            this.snapshots.put(Long.valueOf(this.prevOffset), rawSnapshotReader);
        }

        synchronized void registerLogManager(LocalLogManager localLogManager) {
            if (this.logManagers.put(Integer.valueOf(localLogManager.nodeId), localLogManager) != null) {
                throw new RuntimeException("Can't have multiple LocalLogManagers with id " + localLogManager.nodeId());
            }
            electLeaderIfNeeded();
        }

        synchronized void unregisterLogManager(LocalLogManager localLogManager) {
            if (!this.logManagers.remove(Integer.valueOf(localLogManager.nodeId), localLogManager)) {
                throw new RuntimeException("Log manager " + localLogManager.nodeId() + " was not found.");
            }
        }

        synchronized long tryAppend(int i, int i2, List<ApiMessageAndVersion> list) {
            return tryAppend(i, i2, new LocalRecordBatch(i2, (this.prevOffset + 1) * 10, list));
        }

        synchronized long tryAppend(int i, int i2, LocalBatch localBatch) {
            if (!this.leader.isLeader(i)) {
                this.log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not match the current leader id of {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), this.leader.leaderId()});
                throw new NotLeaderException("Append failed because the replication is not the current leader");
            }
            if (i2 < this.leader.epoch()) {
                throw new NotLeaderException("Append failed because the given epoch " + i2 + " is stale. Current leader epoch = " + this.leader.epoch());
            }
            if (i2 > this.leader.epoch()) {
                throw new IllegalArgumentException("Attempt to append from epoch " + i2 + " which is larger than the current epoch " + this.leader.epoch());
            }
            this.log.trace("tryAppend(nodeId={}): appending {}.", Integer.valueOf(i), localBatch);
            long append = append(localBatch);
            electLeaderIfNeeded();
            return append;
        }

        public synchronized long append(LocalBatch localBatch) {
            this.prevOffset += localBatch.size();
            this.log.debug("append(batch={}, prevOffset={})", localBatch, Long.valueOf(this.prevOffset));
            this.batches.put(Long.valueOf(this.prevOffset), localBatch);
            if (localBatch instanceof LeaderChangeBatch) {
                this.leader = ((LeaderChangeBatch) localBatch).newLeader;
            }
            Iterator<LocalLogManager> it = this.logManagers.values().iterator();
            while (it.hasNext()) {
                it.next().scheduleLogCheck();
            }
            return this.prevOffset;
        }

        synchronized void electLeaderIfNeeded() {
            if (this.leader.leaderId().isPresent() || this.logManagers.isEmpty()) {
                return;
            }
            int nextInt = ThreadLocalRandom.current().nextInt(this.logManagers.size());
            Iterator<Integer> it = this.logManagers.keySet().iterator();
            Integer num = null;
            for (int i = 0; i <= nextInt; i++) {
                num = it.next();
            }
            LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(num.intValue()), this.leader.epoch() + 1);
            this.log.info("Elected new leader: {}.", leaderAndEpoch);
            append(new LeaderChangeBatch(leaderAndEpoch));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized LeaderAndEpoch leaderAndEpoch() {
            return this.leader;
        }

        synchronized Map.Entry<Long, LocalBatch> nextBatch(long j) {
            Map.Entry<Long, LocalBatch> higherEntry = this.batches.higherEntry(Long.valueOf(j));
            if (higherEntry == null) {
                return null;
            }
            return new AbstractMap.SimpleImmutableEntry(higherEntry.getKey(), higherEntry.getValue());
        }

        synchronized Optional<RawSnapshotReader> nextSnapshot(long j) {
            return Optional.ofNullable(this.snapshots.lastEntry()).flatMap(entry -> {
                return j <= ((Long) entry.getKey()).longValue() ? Optional.of(entry.getValue()) : Optional.empty();
            });
        }

        synchronized void addSnapshot(RawSnapshotReader rawSnapshotReader) {
            if (rawSnapshotReader.snapshotId().offset - 1 > this.prevOffset) {
                this.log.error("Ignored attempt to add a snapshot {} that is greater than the latest offset {}", rawSnapshotReader, Long.valueOf(this.prevOffset));
            } else {
                this.snapshots.put(Long.valueOf(rawSnapshotReader.snapshotId().offset - 1), rawSnapshotReader);
                notifyAll();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized RawSnapshotReader waitForSnapshot(long j) throws InterruptedException {
            while (true) {
                RawSnapshotReader rawSnapshotReader = (RawSnapshotReader) this.snapshots.get(Long.valueOf(j));
                if (rawSnapshotReader != null) {
                    return rawSnapshotReader;
                }
                wait();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized RawSnapshotReader waitForLatestSnapshot() throws InterruptedException {
            while (this.snapshots.isEmpty()) {
                wait();
            }
            return (RawSnapshotReader) ((Map.Entry) Objects.requireNonNull(this.snapshots.lastEntry())).getValue();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized long appendedBytes() {
            ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
            return this.batches.values().stream().flatMapToInt(localBatch -> {
                return localBatch instanceof LocalRecordBatch ? ((LocalRecordBatch) localBatch).records.stream().mapToInt(apiMessageAndVersion -> {
                    return LocalLogManager.messageSize(apiMessageAndVersion, objectSerializationCache);
                }) : IntStream.empty();
            }).sum();
        }

        public SharedLogData setInitialMaxReadOffset(long j) {
            this.initialMaxReadOffset = j;
            return this;
        }

        public long initialMaxReadOffset() {
            return this.initialMaxReadOffset;
        }
    }

    public LocalLogManager(LogContext logContext, int i, SharedLogData sharedLogData, String str) {
        this.maxReadOffset = Long.MAX_VALUE;
        this.log = logContext.logger(LocalLogManager.class);
        this.nodeId = i;
        this.shared = sharedLogData;
        this.maxReadOffset = sharedLogData.initialMaxReadOffset();
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, str);
        sharedLogData.registerLogManager(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleLogCheck() {
        this.eventQueue.append(() -> {
            try {
                this.log.debug("Node {}: running log check.", Integer.valueOf(this.nodeId));
                int i = 0;
                for (MetaLogListenerData metaLogListenerData : this.listeners.values()) {
                    while (true) {
                        if (!OptionalInt.of(this.nodeId).equals(metaLogListenerData.notifiedLeader().leaderId())) {
                            Optional<RawSnapshotReader> nextSnapshot = this.shared.nextSnapshot(metaLogListenerData.offset());
                            if (nextSnapshot.isPresent()) {
                                this.log.trace("Node {}: handling snapshot with id {}.", Integer.valueOf(this.nodeId), nextSnapshot.get().snapshotId());
                                metaLogListenerData.handleSnapshot(RecordsSnapshotReader.of(nextSnapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true));
                            }
                        }
                        Map.Entry<Long, LocalBatch> nextBatch = this.shared.nextBatch(metaLogListenerData.offset());
                        if (nextBatch == null) {
                            this.log.trace("Node {}: reached the end of the log after finding {} entries.", Integer.valueOf(this.nodeId), Integer.valueOf(i));
                            break;
                        }
                        long longValue = nextBatch.getKey().longValue();
                        if (longValue > this.maxReadOffset) {
                            this.log.trace("Node {}: after {} entries, not reading the next entry because its offset is {}, and maxReadOffset is {}.", new Object[]{Integer.valueOf(this.nodeId), Integer.valueOf(i), Long.valueOf(longValue), Long.valueOf(this.maxReadOffset)});
                            break;
                        }
                        if (nextBatch.getValue() instanceof LeaderChangeBatch) {
                            LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) nextBatch.getValue();
                            this.log.trace("Node {}: handling LeaderChange to {}.", Integer.valueOf(this.nodeId), leaderChangeBatch.newLeader);
                            LeaderAndEpoch leaderAndEpoch = this.shared.leaderAndEpoch();
                            if (leaderChangeBatch.newLeader.equals(leaderAndEpoch)) {
                                this.log.debug("Node {}: Executing handleLeaderChange {}", Integer.valueOf(this.nodeId), leaderAndEpoch);
                                metaLogListenerData.handleLeaderChange(longValue, leaderChangeBatch.newLeader);
                                if (leaderChangeBatch.newLeader.epoch() > this.leader.epoch()) {
                                    this.leader = leaderChangeBatch.newLeader;
                                }
                            } else {
                                this.log.debug("Node {}: Ignoring {} since it doesn't match the latest known leader {}", new Object[]{Integer.valueOf(this.nodeId), leaderChangeBatch.newLeader, leaderAndEpoch});
                                metaLogListenerData.setOffset(longValue);
                            }
                        } else if (nextBatch.getValue() instanceof LocalRecordBatch) {
                            LocalRecordBatch localRecordBatch = (LocalRecordBatch) nextBatch.getValue();
                            this.log.trace("Node {}: handling LocalRecordBatch with offset {}.", Integer.valueOf(this.nodeId), Long.valueOf(longValue));
                            ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
                            metaLogListenerData.handleCommit(MemoryBatchReader.of(Collections.singletonList(Batch.data((longValue - localRecordBatch.records.size()) + 1, localRecordBatch.leaderEpoch, localRecordBatch.appendTimestamp, localRecordBatch.records.stream().mapToInt(apiMessageAndVersion -> {
                                return messageSize(apiMessageAndVersion, objectSerializationCache);
                            }).sum(), localRecordBatch.records)), batchReader -> {
                            }));
                        }
                        i++;
                    }
                }
                this.log.trace("Completed log check for node " + this.nodeId);
            } catch (Exception e) {
                this.log.error("Exception while handling log check", e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int messageSize(ApiMessageAndVersion apiMessageAndVersion, ObjectSerializationCache objectSerializationCache) {
        return new MetadataRecordSerde().recordSize(apiMessageAndVersion, objectSerializationCache);
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown", () -> {
            try {
                if (this.initialized && !this.shutdown) {
                    this.log.debug("Node {}: beginning shutdown.", Integer.valueOf(this.nodeId));
                    resign(this.leader.epoch());
                    Iterator<MetaLogListenerData> it = this.listeners.values().iterator();
                    while (it.hasNext()) {
                        it.next().beginShutdown();
                    }
                    this.shared.unregisterLogManager(this);
                }
            } catch (Exception e) {
                this.log.error("Unexpected exception while sending beginShutdown callbacks", e);
            }
            this.shutdown = true;
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.log.debug("Node {}: closing.", Integer.valueOf(this.nodeId));
        beginShutdown();
        try {
            this.eventQueue.close();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Void> shutdown(int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            close();
            completableFuture.complete(null);
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    public void initialize() {
        this.eventQueue.append(() -> {
            this.log.debug("initialized local log manager for node " + this.nodeId);
            this.initialized = true;
        });
    }

    public void register(RaftClient.Listener<ApiMessageAndVersion> listener) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.eventQueue.append(() -> {
            if (this.shutdown) {
                this.log.info("Node {}: can't register because local log manager has already been shut down.", Integer.valueOf(this.nodeId));
                completableFuture.complete(null);
                return;
            }
            if (!this.initialized) {
                this.log.info("Node {}: can't register because local log manager has not been initialized.", Integer.valueOf(this.nodeId));
                completableFuture.completeExceptionally(new RuntimeException("LocalLogManager was not initialized."));
                return;
            }
            int identityHashCode = System.identityHashCode(listener);
            if (this.listeners.putIfAbsent(listener, new MetaLogListenerData(listener)) != null) {
                this.log.error("Node {}: can't register because listener {} already exists", Integer.valueOf(this.nodeId), Integer.valueOf(identityHashCode));
            } else {
                this.log.info("Node {}: registered MetaLogListener {}", Integer.valueOf(this.nodeId), Integer.valueOf(identityHashCode));
            }
            this.shared.electLeaderIfNeeded();
            scheduleLogCheck();
            completableFuture.complete(null);
        });
        try {
            completableFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
        this.eventQueue.append(() -> {
            if (this.shutdown) {
                this.log.info("Node {}: can't unregister because local log manager is shutdown", Integer.valueOf(this.nodeId));
                return;
            }
            int identityHashCode = System.identityHashCode(listener);
            if (this.listeners.remove(listener) == null) {
                this.log.error("Node {}: can't unregister because the listener {} doesn't exists", Integer.valueOf(this.nodeId), Integer.valueOf(identityHashCode));
            } else {
                this.log.info("Node {}: unregistered MetaLogListener {}", Integer.valueOf(this.nodeId), Integer.valueOf(identityHashCode));
            }
        });
    }

    public synchronized OptionalLong highWatermark() {
        return this.shared.prevOffset > 0 ? OptionalLong.of(this.shared.prevOffset) : OptionalLong.empty();
    }

    public long scheduleAppend(int i, List<ApiMessageAndVersion> list) {
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Batch cannot be empty");
        }
        List<ApiMessageAndVersion> subList = list.subList(0, list.size() / 2);
        List<ApiMessageAndVersion> subList2 = list.subList(list.size() / 2, list.size());
        Assertions.assertEquals(list.size(), subList.size() + subList2.size());
        Assertions.assertFalse(subList2.isEmpty());
        OptionalLong max = subList.stream().mapToLong(apiMessageAndVersion -> {
            return scheduleAtomicAppend(i, Collections.singletonList(apiMessageAndVersion));
        }).max();
        if (!max.isPresent() || !this.resignAfterNonAtomicCommit.getAndSet(false)) {
            return subList2.stream().mapToLong(apiMessageAndVersion2 -> {
                return scheduleAtomicAppend(i, Collections.singletonList(apiMessageAndVersion2));
            }).max().getAsLong();
        }
        resign(this.leader.epoch());
        return max.getAsLong() + subList2.size();
    }

    public long scheduleAtomicAppend(int i, List<ApiMessageAndVersion> list) {
        return this.shared.tryAppend(this.nodeId, this.leader.epoch(), list);
    }

    public void resign(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + i);
        }
        int epoch = leaderAndEpoch().epoch();
        if (i > epoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + i + " which is larger than the current epoch " + epoch);
        }
        if (i < epoch) {
            this.log.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(epoch));
            return;
        }
        try {
            this.shared.tryAppend(this.nodeId, epoch, new LeaderChangeBatch(new LeaderAndEpoch(OptionalInt.empty(), epoch + 1)));
        } catch (NotLeaderException e) {
            this.log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(epoch));
        }
    }

    public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(long j, int i, long j2) {
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(j + 1, i);
        return RecordsSnapshotWriter.createWithHeader(() -> {
            return createNewSnapshot(offsetAndEpoch);
        }, 1024, MemoryPool.NONE, new MockTime(), j2, CompressionType.NONE, new MetadataRecordSerde());
    }

    private Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch offsetAndEpoch) {
        return Optional.of(new MockRawSnapshotWriter(offsetAndEpoch, byteBuffer -> {
            this.shared.addSnapshot(new MockRawSnapshotReader(offsetAndEpoch, byteBuffer));
        }));
    }

    public LeaderAndEpoch leaderAndEpoch() {
        return this.leader;
    }

    public OptionalInt nodeId() {
        return OptionalInt.of(this.nodeId);
    }

    public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.eventQueue.append(() -> {
            completableFuture.complete(this.listeners.values().stream().map(metaLogListenerData -> {
                return metaLogListenerData.listener;
            }).collect(Collectors.toList()));
        });
        try {
            return (List) completableFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void setMaxReadOffset(long j) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.eventQueue.append(() -> {
            this.log.trace("Node {}: set maxReadOffset to {}.", Integer.valueOf(this.nodeId), Long.valueOf(j));
            this.maxReadOffset = j;
            scheduleLogCheck();
            completableFuture.complete(null);
        });
        try {
            completableFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void resignAfterNonAtomicCommit() {
        this.resignAfterNonAtomicCommit.set(true);
    }
}
