package org.apache.kafka.coordinator.group.runtime;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.class */
public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements AutoCloseable {
    static final int MIN_BUFFER_SIZE = 16384;
    private final String logPrefix;
    private final LogContext logContext;
    private final Logger log;
    private final Time time;
    private final Timer timer;
    private final Duration defaultWriteTimeout;
    private final ConcurrentHashMap<TopicPartition, CoordinatorRuntime<S, U>.CoordinatorContext> coordinators;
    private final CoordinatorEventProcessor processor;
    private final PartitionWriter partitionWriter;
    private final CoordinatorLoader<U> loader;
    private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
    private final CoordinatorRuntimeMetrics runtimeMetrics;
    private final CoordinatorMetrics coordinatorMetrics;
    private final Serializer<U> serializer;
    private final Compression compression;
    private final int appendLingerMs;
    private final AtomicBoolean isRunning;
    private volatile MetadataImage metadataImage;

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$Builder.class */
    public static class Builder<S extends CoordinatorShard<U>, U> {
        private String logPrefix;
        private LogContext logContext;
        private CoordinatorEventProcessor eventProcessor;
        private PartitionWriter partitionWriter;
        private CoordinatorLoader<U> loader;
        private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
        private Time time = Time.SYSTEM;
        private Timer timer;
        private Duration defaultWriteTimeout;
        private CoordinatorRuntimeMetrics runtimeMetrics;
        private CoordinatorMetrics coordinatorMetrics;
        private Serializer<U> serializer;
        private Compression compression;
        private int appendLingerMs;

        public Builder<S, U> withLogPrefix(String str) {
            this.logPrefix = str;
            return this;
        }

        public Builder<S, U> withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder<S, U> withEventProcessor(CoordinatorEventProcessor coordinatorEventProcessor) {
            this.eventProcessor = coordinatorEventProcessor;
            return this;
        }

        public Builder<S, U> withPartitionWriter(PartitionWriter partitionWriter) {
            this.partitionWriter = partitionWriter;
            return this;
        }

        public Builder<S, U> withLoader(CoordinatorLoader<U> coordinatorLoader) {
            this.loader = coordinatorLoader;
            return this;
        }

        public Builder<S, U> withCoordinatorShardBuilderSupplier(CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier) {
            this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
            return this;
        }

        public Builder<S, U> withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder<S, U> withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder<S, U> withDefaultWriteTimeOut(Duration duration) {
            this.defaultWriteTimeout = duration;
            return this;
        }

        public Builder<S, U> withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.runtimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public Builder<S, U> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            this.coordinatorMetrics = coordinatorMetrics;
            return this;
        }

        public Builder<S, U> withSerializer(Serializer<U> serializer) {
            this.serializer = serializer;
            return this;
        }

        public Builder<S, U> withCompression(Compression compression) {
            this.compression = compression;
            return this;
        }

        public Builder<S, U> withAppendLingerMs(int i) {
            this.appendLingerMs = i;
            return this;
        }

        public CoordinatorRuntime<S, U> build() {
            if (this.logPrefix == null) {
                this.logPrefix = ClassicGroup.NO_LEADER;
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(this.logPrefix);
            }
            if (this.eventProcessor == null) {
                throw new IllegalArgumentException("Event processor must be set.");
            }
            if (this.partitionWriter == null) {
                throw new IllegalArgumentException("Partition write must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.coordinatorShardBuilderSupplier == null) {
                throw new IllegalArgumentException("State machine supplier must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.runtimeMetrics == null) {
                throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
            }
            if (this.coordinatorMetrics == null) {
                throw new IllegalArgumentException("CoordinatorMetrics must be set.");
            }
            if (this.serializer == null) {
                throw new IllegalArgumentException("Serializer must be set.");
            }
            if (this.compression == null) {
                this.compression = Compression.NONE;
            }
            if (this.appendLingerMs < 0) {
                throw new IllegalArgumentException("AppendLinger must be >= 0");
            }
            return new CoordinatorRuntime<>(this.logPrefix, this.logContext, this.eventProcessor, this.partitionWriter, this.loader, this.coordinatorShardBuilderSupplier, this.time, this.timer, this.defaultWriteTimeout, this.runtimeMetrics, this.coordinatorMetrics, this.serializer, this.compression, this.appendLingerMs);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorBatch.class */
    public static class CoordinatorBatch {
        final long baseOffset;
        final long appendTimeMs;
        final int maxBatchSize;
        final VerificationGuard verificationGuard;
        final ByteBuffer buffer;
        final MemoryRecordsBuilder builder;
        final Optional<TimerTask> lingerTimeoutTask;
        final List<DeferredEvent> deferredEvents = new ArrayList();
        long nextOffset;

        CoordinatorBatch(long j, long j2, int i, VerificationGuard verificationGuard, ByteBuffer byteBuffer, MemoryRecordsBuilder memoryRecordsBuilder, Optional<TimerTask> optional) {
            this.baseOffset = j;
            this.nextOffset = j;
            this.appendTimeMs = j2;
            this.maxBatchSize = i;
            this.verificationGuard = verificationGuard;
            this.buffer = byteBuffer;
            this.builder = memoryRecordsBuilder;
            this.lingerTimeoutTask = optional;
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorCompleteTransactionEvent.class */
    class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredEvent {
        final TopicPartition tp;
        final String name;
        final long producerId;
        final short producerEpoch;
        final int coordinatorEpoch;
        final TransactionResult result;
        final Duration writeTimeout;
        private CoordinatorRuntime<S, U>.OperationTimeout operationTimeout = null;
        final CompletableFuture<Void> future = new CompletableFuture<>();
        private final long createdTimeMs;

        CoordinatorCompleteTransactionEvent(String str, TopicPartition topicPartition, long j, short s, int i, TransactionResult transactionResult, Duration duration) {
            this.name = str;
            this.tp = topicPartition;
            this.producerId = j;
            this.producerEpoch = s;
            this.coordinatorEpoch = i;
            this.result = transactionResult;
            this.writeTimeout = duration;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    coordinatorContext.completeTransaction(this.producerId, this.producerEpoch, this.coordinatorEpoch, this.result, this);
                    if (this.future.isDone()) {
                        return;
                    }
                    this.operationTimeout = new OperationTimeout(this.tp, this, this.writeTimeout.toMillis());
                    CoordinatorRuntime.this.timer.add(this.operationTimeout);
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th == null) {
                this.future.complete(null);
            } else {
                this.future.completeExceptionally(th);
            }
            if (this.operationTimeout != null) {
                this.operationTimeout.cancel();
                this.operationTimeout = null;
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorCompleteTransactionEvent(name=" + this.name + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorContext.class */
    public class CoordinatorContext {
        final ReentrantLock lock;
        final TopicPartition tp;
        final LogContext logContext;
        final DeferredEventQueue deferredEventQueue;
        final CoordinatorRuntime<S, U>.EventBasedCoordinatorTimer timer;
        CoordinatorState state;
        int epoch;
        SnapshottableCoordinator<S, U> coordinator;
        CoordinatorRuntime<S, U>.HighWatermarkListener highWatermarklistener;
        BufferSupplier bufferSupplier;
        CoordinatorBatch currentBatch;

        private CoordinatorContext(TopicPartition topicPartition) {
            this.lock = new ReentrantLock();
            this.tp = topicPartition;
            this.logContext = new LogContext(String.format("[%s topic=%s partition=%d] ", CoordinatorRuntime.this.logPrefix, topicPartition.topic(), Integer.valueOf(topicPartition.partition())));
            this.state = CoordinatorState.INITIAL;
            this.epoch = -1;
            this.deferredEventQueue = new DeferredEventQueue(this.logContext);
            this.timer = new EventBasedCoordinatorTimer(topicPartition, this.logContext);
            this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void transitionTo(CoordinatorState coordinatorState) {
            if (!coordinatorState.canTransitionFrom(this.state)) {
                throw new IllegalStateException("Cannot transition from " + this.state + " to " + coordinatorState);
            }
            CoordinatorState coordinatorState2 = this.state;
            CoordinatorRuntime.this.log.debug("Transition from {} to {}.", this.state, coordinatorState);
            switch (coordinatorState) {
                case LOADING:
                    this.state = CoordinatorState.LOADING;
                    SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
                    this.coordinator = new SnapshottableCoordinator<>(this.logContext, snapshotRegistry, CoordinatorRuntime.this.coordinatorShardBuilderSupplier.get().withLogContext(this.logContext).withSnapshotRegistry(snapshotRegistry).withTime(CoordinatorRuntime.this.time).withTimer(this.timer).withCoordinatorMetrics(CoordinatorRuntime.this.coordinatorMetrics).withTopicPartition(this.tp).build(), this.tp);
                    load();
                    break;
                case ACTIVE:
                    this.state = CoordinatorState.ACTIVE;
                    this.highWatermarklistener = new HighWatermarkListener();
                    CoordinatorRuntime.this.partitionWriter.registerListener(this.tp, this.highWatermarklistener);
                    this.coordinator.onLoaded(CoordinatorRuntime.this.metadataImage);
                    break;
                case FAILED:
                    this.state = CoordinatorState.FAILED;
                    unload();
                    break;
                case CLOSED:
                    this.state = CoordinatorState.CLOSED;
                    unload();
                    break;
                default:
                    throw new IllegalArgumentException("Transitioning to " + coordinatorState + " is not supported.");
            }
            CoordinatorRuntime.this.runtimeMetrics.recordPartitionStateChange(coordinatorState2, this.state);
        }

        private void load() {
            if (this.state != CoordinatorState.LOADING) {
                throw new IllegalStateException("Coordinator must be in loading state");
            }
            CoordinatorRuntime.this.loader.load(this.tp, this.coordinator).whenComplete((loadSummary, th) -> {
                CoordinatorRuntime.this.scheduleInternalOperation("CompleteLoad(tp=" + this.tp + ", epoch=" + this.epoch + ")", this.tp, () -> {
                    CoordinatorContext coordinatorContext = (CoordinatorContext) CoordinatorRuntime.this.coordinators.get(this.tp);
                    if (coordinatorContext == null) {
                        CoordinatorRuntime.this.log.debug("Failed to complete the loading of metadata for {} in epoch {} since the coordinator does not exist.", this.tp, Integer.valueOf(this.epoch));
                        return;
                    }
                    if (coordinatorContext.state != CoordinatorState.LOADING) {
                        CoordinatorRuntime.this.log.info("Ignored load completion from {} because context is in {} state.", coordinatorContext.tp, coordinatorContext.state);
                        return;
                    }
                    try {
                        if (th != null) {
                            throw th;
                        }
                        coordinatorContext.transitionTo(CoordinatorState.ACTIVE);
                        if (loadSummary != null) {
                            CoordinatorRuntime.this.runtimeMetrics.recordPartitionLoadSensor(loadSummary.startTimeMs(), loadSummary.endTimeMs());
                            CoordinatorRuntime.this.log.info("Finished loading of metadata from {} with epoch {} in {}ms where {}ms was spent in the scheduler. Loaded {} records which total to {} bytes.", new Object[]{this.tp, Integer.valueOf(this.epoch), Long.valueOf(loadSummary.endTimeMs() - loadSummary.startTimeMs()), Long.valueOf(loadSummary.schedulerQueueTimeMs()), Long.valueOf(loadSummary.numRecords()), Long.valueOf(loadSummary.numBytes())});
                        }
                    } catch (Throwable th) {
                        CoordinatorRuntime.this.log.error("Failed to load metadata from {} with epoch {} due to {}.", new Object[]{this.tp, Integer.valueOf(this.epoch), th.toString()});
                        coordinatorContext.transitionTo(CoordinatorState.FAILED);
                    }
                });
            });
        }

        private void unload() {
            if (this.highWatermarklistener != null) {
                CoordinatorRuntime.this.partitionWriter.deregisterListener(this.tp, this.highWatermarklistener);
                this.highWatermarklistener = null;
            }
            this.timer.cancelAll();
            this.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
            failCurrentBatch(Errors.NOT_COORDINATOR.exception());
            if (this.coordinator != null) {
                this.coordinator.onUnloaded();
            }
            this.coordinator = null;
        }

        private void freeCurrentBatch() {
            this.currentBatch.lingerTimeoutTask.ifPresent((v0) -> {
                v0.cancel();
            });
            this.bufferSupplier.release(this.currentBatch.buffer);
            this.currentBatch = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushCurrentBatch() {
            if (this.currentBatch != null) {
                try {
                    long append = CoordinatorRuntime.this.partitionWriter.append(this.tp, this.currentBatch.verificationGuard, this.currentBatch.builder.build());
                    this.coordinator.updateLastWrittenOffset(Long.valueOf(append));
                    if (append != this.currentBatch.nextOffset) {
                        CoordinatorRuntime.this.log.error("The state machine of the coordinator {} is out of sync with the underlying log. The last written offset returned is {} while the coordinator expected {}. The coordinator will be reloaded in order to re-synchronize the state machine.", new Object[]{this.tp, Long.valueOf(append), Long.valueOf(this.currentBatch.nextOffset)});
                        transitionTo(CoordinatorState.FAILED);
                        transitionTo(CoordinatorState.LOADING);
                        throw Errors.NOT_COORDINATOR.exception();
                    }
                    Iterator<DeferredEvent> it = this.currentBatch.deferredEvents.iterator();
                    while (it.hasNext()) {
                        this.deferredEventQueue.add(append, it.next());
                    }
                    freeCurrentBatch();
                } catch (Throwable th) {
                    CoordinatorRuntime.this.log.error("Writing records to {} failed due to: {}.", this.tp, th.getMessage());
                    failCurrentBatch(th);
                    throw th;
                }
            }
        }

        private void maybeFlushCurrentBatch(long j) {
            if (this.currentBatch != null) {
                if (this.currentBatch.builder.isTransactional() || this.currentBatch.appendTimeMs - j >= CoordinatorRuntime.this.appendLingerMs) {
                    flushCurrentBatch();
                }
            }
        }

        private void failCurrentBatch(Throwable th) {
            if (this.currentBatch != null) {
                this.coordinator.revertLastWrittenOffset(this.currentBatch.baseOffset);
                Iterator<DeferredEvent> it = this.currentBatch.deferredEvents.iterator();
                while (it.hasNext()) {
                    it.next().complete(th);
                }
                freeCurrentBatch();
            }
        }

        private void maybeAllocateNewBatch(long j, short s, VerificationGuard verificationGuard, long j2) {
            if (this.currentBatch == null) {
                LogConfig config = CoordinatorRuntime.this.partitionWriter.config(this.tp);
                byte b = config.recordVersion().value;
                int maxMessageSize = config.maxMessageSize();
                long lastWrittenOffset = this.coordinator.lastWrittenOffset();
                ByteBuffer byteBuffer = this.bufferSupplier.get(maxMessageSize);
                MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBuffer, b, CoordinatorRuntime.this.compression, TimestampType.CREATE_TIME, 0L, j2, j, s, 0, j != -1, false, -1, maxMessageSize);
                Optional empty = Optional.empty();
                if (CoordinatorRuntime.this.appendLingerMs > 0) {
                    empty = Optional.of(new TimerTask(CoordinatorRuntime.this.appendLingerMs) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorContext.1
                        public void run() {
                            CoordinatorRuntime.this.enqueueFirst(new CoordinatorInternalEvent("FlushBatch", CoordinatorContext.this.tp, () -> {
                                if (isCancelled()) {
                                    return;
                                }
                                CoordinatorRuntime.this.withActiveContextOrThrow(CoordinatorContext.this.tp, obj -> {
                                    ((CoordinatorContext) obj).flushCurrentBatch();
                                });
                            }));
                        }
                    });
                    CoordinatorRuntime.this.timer.add((TimerTask) empty.get());
                }
                this.currentBatch = new CoordinatorBatch(lastWrittenOffset, j2, maxMessageSize, verificationGuard, byteBuffer, memoryRecordsBuilder, empty);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void append(long j, short s, VerificationGuard verificationGuard, List<U> list, boolean z, boolean z2, DeferredEvent deferredEvent) {
            if (this.state != CoordinatorState.ACTIVE) {
                throw new IllegalStateException("Coordinator must be active to append records");
            }
            if (list.isEmpty()) {
                if (this.currentBatch != null) {
                    this.currentBatch.deferredEvents.add(deferredEvent);
                    return;
                } else if (this.coordinator.lastCommittedOffset() < this.coordinator.lastWrittenOffset()) {
                    this.deferredEventQueue.add(this.coordinator.lastWrittenOffset(), deferredEvent);
                    return;
                } else {
                    deferredEvent.complete((Throwable) null);
                    return;
                }
            }
            long milliseconds = CoordinatorRuntime.this.time.milliseconds();
            if (j != -1) {
                z2 = true;
                flushCurrentBatch();
            }
            maybeAllocateNewBatch(j, s, verificationGuard, milliseconds);
            ArrayList arrayList = new ArrayList(list.size());
            for (U u : list) {
                arrayList.add(new SimpleRecord(milliseconds, CoordinatorRuntime.this.serializer.serializeKey(u), CoordinatorRuntime.this.serializer.serializeValue(u)));
            }
            if (z2) {
                int estimateSizeInBytes = AbstractRecords.estimateSizeInBytes(this.currentBatch.builder.magic(), CoordinatorRuntime.this.compression.type(), arrayList);
                if (estimateSizeInBytes > this.currentBatch.builder.maxAllowedBytes()) {
                    throw new RecordTooLargeException("Message batch size is " + estimateSizeInBytes + " bytes in append to partition " + this.tp + " which exceeds the maximum configured size of " + this.currentBatch.maxBatchSize + ".");
                }
                if (!this.currentBatch.builder.hasRoomFor(estimateSizeInBytes)) {
                    flushCurrentBatch();
                    maybeAllocateNewBatch(j, s, verificationGuard, milliseconds);
                }
            }
            for (int i = 0; i < list.size(); i++) {
                U u2 = list.get(i);
                SimpleRecord simpleRecord = (SimpleRecord) arrayList.get(i);
                if (!z2 && !this.currentBatch.builder.hasRoomFor(simpleRecord.timestamp(), simpleRecord.key(), simpleRecord.value(), simpleRecord.headers())) {
                    flushCurrentBatch();
                    maybeAllocateNewBatch(j, s, verificationGuard, milliseconds);
                }
                if (z) {
                    try {
                        this.coordinator.replay(this.currentBatch.nextOffset, j, s, u2);
                    } catch (Throwable th) {
                        CoordinatorRuntime.this.log.error("Replaying record {} to {} failed due to: {}.", new Object[]{u2, this.tp, th.getMessage()});
                        this.currentBatch.deferredEvents.add(deferredEvent);
                        failCurrentBatch(th);
                        return;
                    }
                }
                this.currentBatch.builder.append(simpleRecord);
                this.currentBatch.nextOffset++;
            }
            this.currentBatch.deferredEvents.add(deferredEvent);
            maybeFlushCurrentBatch(milliseconds);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void completeTransaction(long j, short s, int i, TransactionResult transactionResult, DeferredEvent deferredEvent) {
            if (this.state != CoordinatorState.ACTIVE) {
                throw new IllegalStateException("Coordinator must be active to complete a transaction");
            }
            flushCurrentBatch();
            long lastWrittenOffset = this.coordinator.lastWrittenOffset();
            try {
                this.coordinator.replayEndTransactionMarker(j, s, transactionResult);
                long append = CoordinatorRuntime.this.partitionWriter.append(this.tp, VerificationGuard.SENTINEL, MemoryRecords.withEndTransactionMarker(CoordinatorRuntime.this.time.milliseconds(), j, s, new EndTransactionMarker(transactionResult == TransactionResult.COMMIT ? ControlRecordType.COMMIT : ControlRecordType.ABORT, i)));
                this.coordinator.updateLastWrittenOffset(Long.valueOf(append));
                this.deferredEventQueue.add(append, deferredEvent);
            } catch (Throwable th) {
                this.coordinator.revertLastWrittenOffset(lastWrittenOffset);
                deferredEvent.complete(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorInternalEvent.class */
    public class CoordinatorInternalEvent implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final Runnable op;
        private final long createdTimeMs;

        CoordinatorInternalEvent(String str, TopicPartition topicPartition, Runnable runnable) {
            this.tp = topicPartition;
            this.name = str;
            this.op = runnable;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                this.op.run();
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th != null) {
                CoordinatorRuntime.this.log.error("Execution of {} failed due to {}.", new Object[]{this.name, th.getMessage(), th});
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "InternalEvent(name=" + this.name + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorReadEvent.class */
    public class CoordinatorReadEvent<T> implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final CoordinatorReadOperation<S, T> op;
        final CompletableFuture<T> future = new CompletableFuture<>();
        T response;
        private final long createdTimeMs;

        CoordinatorReadEvent(String str, TopicPartition topicPartition, CoordinatorReadOperation<S, T> coordinatorReadOperation) {
            this.tp = topicPartition;
            this.name = str;
            this.op = coordinatorReadOperation;
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    this.response = (T) this.op.generateResponse(coordinatorContext.coordinator.coordinator(), coordinatorContext.coordinator.lastCommittedOffset());
                    complete(null);
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            if (th == null) {
                this.future.complete(this.response);
            } else {
                this.future.completeExceptionally(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorReadEvent(name=" + this.name + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorReadOperation.class */
    public interface CoordinatorReadOperation<S, T> {
        T generateResponse(S s, long j) throws KafkaException;
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorState.class */
    public enum CoordinatorState {
        INITIAL { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.1
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return false;
            }
        },
        LOADING { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.2
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == INITIAL || coordinatorState == FAILED;
            }
        },
        ACTIVE { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.3
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == ACTIVE || coordinatorState == LOADING;
            }
        },
        CLOSED { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.4
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return true;
            }
        },
        FAILED { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.5
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState
            boolean canTransitionFrom(CoordinatorState coordinatorState) {
                return coordinatorState == LOADING || coordinatorState == ACTIVE;
            }
        };

        abstract boolean canTransitionFrom(CoordinatorState coordinatorState);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorWriteEvent.class */
    public class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent {
        final TopicPartition tp;
        final String name;
        final String transactionalId;
        final long producerId;
        final short producerEpoch;
        final VerificationGuard verificationGuard;
        final CoordinatorWriteOperation<S, T, U> op;
        final CompletableFuture<T> future;
        final Duration writeTimeout;
        private CoordinatorRuntime<S, U>.OperationTimeout operationTimeout;
        CoordinatorResult<T, U> result;
        private final long createdTimeMs;

        CoordinatorWriteEvent(CoordinatorRuntime coordinatorRuntime, String str, TopicPartition topicPartition, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
            this(str, topicPartition, null, -1L, (short) -1, VerificationGuard.SENTINEL, duration, coordinatorWriteOperation);
        }

        CoordinatorWriteEvent(String str, TopicPartition topicPartition, String str2, long j, short s, VerificationGuard verificationGuard, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
            this.operationTimeout = null;
            this.tp = topicPartition;
            this.name = str;
            this.op = coordinatorWriteOperation;
            this.transactionalId = str2;
            this.producerId = j;
            this.producerEpoch = s;
            this.verificationGuard = verificationGuard;
            this.future = new CompletableFuture<>();
            this.createdTimeMs = CoordinatorRuntime.this.time.milliseconds();
            this.writeTimeout = duration;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.coordinator.group.runtime.EventAccumulator.Event
        public TopicPartition key() {
            return this.tp;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, coordinatorContext -> {
                    this.result = this.op.generateRecordsAndResult(coordinatorContext.coordinator.coordinator());
                    coordinatorContext.append(this.producerId, this.producerEpoch, this.verificationGuard, this.result.records(), this.result.replayRecords(), this.result.isAtomic(), this);
                    if (this.future.isDone()) {
                        return;
                    }
                    this.operationTimeout = new OperationTimeout(this.tp, this, this.writeTimeout.toMillis());
                    CoordinatorRuntime.this.timer.add(this.operationTimeout);
                });
            } catch (Throwable th) {
                complete(th);
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public void complete(Throwable th) {
            CompletableFuture<Void> appendFuture = this.result != null ? this.result.appendFuture() : null;
            if (th == null) {
                if (appendFuture != null) {
                    this.result.appendFuture().complete(null);
                }
                this.future.complete(this.result.response());
            } else {
                if (appendFuture != null) {
                    this.result.appendFuture().completeExceptionally(th);
                }
                this.future.completeExceptionally(th);
            }
            if (this.operationTimeout != null) {
                this.operationTimeout.cancel();
                this.operationTimeout = null;
            }
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEvent
        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public String toString() {
            return "CoordinatorWriteEvent(name=" + this.name + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$CoordinatorWriteOperation.class */
    public interface CoordinatorWriteOperation<S, T, U> {
        CoordinatorResult<T, U> generateRecordsAndResult(S s) throws KafkaException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$EventBasedCoordinatorTimer.class */
    public class EventBasedCoordinatorTimer implements CoordinatorTimer<Void, U> {
        final Logger log;
        final TopicPartition tp;
        final Map<String, TimerTask> tasks = new HashMap();

        EventBasedCoordinatorTimer(TopicPartition topicPartition, LogContext logContext) {
            this.tp = topicPartition;
            this.log = logContext.logger(EventBasedCoordinatorTimer.class);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void schedule(String str, long j, TimeUnit timeUnit, boolean z, CoordinatorTimer.TimeoutOperation<Void, U> timeoutOperation) {
            schedule(str, j, timeUnit, z, 500L, timeoutOperation);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void schedule(final String str, long j, TimeUnit timeUnit, final boolean z, final long j2, final CoordinatorTimer.TimeoutOperation<Void, U> timeoutOperation) {
            TimerTask timerTask = new TimerTask(timeUnit.toMillis(j)) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.EventBasedCoordinatorTimer.1
                public void run() {
                    String str2 = "Timeout(tp=" + EventBasedCoordinatorTimer.this.tp + ", key=" + str + ")";
                    CoordinatorRuntime coordinatorRuntime = CoordinatorRuntime.this;
                    TopicPartition topicPartition = EventBasedCoordinatorTimer.this.tp;
                    Duration duration = CoordinatorRuntime.this.defaultWriteTimeout;
                    String str3 = str;
                    CoordinatorTimer.TimeoutOperation timeoutOperation2 = timeoutOperation;
                    CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(coordinatorRuntime, str2, topicPartition, duration, coordinatorShard -> {
                        EventBasedCoordinatorTimer.this.log.debug("Executing write event {} for timer {}.", str2, str3);
                        if (EventBasedCoordinatorTimer.this.tasks.remove(str3, this)) {
                            return timeoutOperation2.generateRecords();
                        }
                        throw new RejectedExecutionException("Timer " + str3 + " was overridden or cancelled");
                    });
                    CompletableFuture<T> completableFuture = coordinatorWriteEvent.future;
                    String str4 = str;
                    boolean z2 = z;
                    long j3 = j2;
                    CoordinatorTimer.TimeoutOperation timeoutOperation3 = timeoutOperation;
                    completableFuture.exceptionally(th -> {
                        if (th instanceof RejectedExecutionException) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} was not executed because it was cancelled or overridden.", coordinatorWriteEvent.name, str4);
                            return null;
                        }
                        if ((th instanceof NotCoordinatorException) || (th instanceof CoordinatorLoadInProgressException)) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} failed due to {}. Ignoring it because the coordinator is not active.", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                            return null;
                        }
                        if (!z2) {
                            EventBasedCoordinatorTimer.this.log.error("The write event {} for the timer {} failed due to {}. Ignoring it. ", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                            return null;
                        }
                        EventBasedCoordinatorTimer.this.log.info("The write event {} for the timer {} failed due to {}. Rescheduling it. ", new Object[]{coordinatorWriteEvent.name, str4, th.getMessage()});
                        EventBasedCoordinatorTimer.this.schedule(str4, j3, TimeUnit.MILLISECONDS, true, j3, timeoutOperation3);
                        return null;
                    });
                    EventBasedCoordinatorTimer.this.log.debug("Scheduling write event {} for timer {}.", coordinatorWriteEvent.name, str);
                    try {
                        CoordinatorRuntime.this.enqueueLast(coordinatorWriteEvent);
                    } catch (NotCoordinatorException e) {
                        EventBasedCoordinatorTimer.this.log.info("Failed to enqueue write event {} for timer {} because the runtime is closed. Ignoring it.", coordinatorWriteEvent.name, str);
                    }
                }
            };
            this.log.debug("Registering timer {} with delay of {}ms.", str, Long.valueOf(timeUnit.toMillis(j)));
            TimerTask put = this.tasks.put(str, timerTask);
            if (put != null) {
                put.cancel();
            }
            CoordinatorRuntime.this.timer.add(timerTask);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void scheduleIfAbsent(String str, long j, TimeUnit timeUnit, boolean z, CoordinatorTimer.TimeoutOperation<Void, U> timeoutOperation) {
            if (this.tasks.containsKey(str)) {
                return;
            }
            schedule(str, j, timeUnit, z, 500L, timeoutOperation);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorTimer
        public void cancel(String str) {
            TimerTask remove = this.tasks.remove(str);
            if (remove != null) {
                remove.cancel();
            }
        }

        public void cancelAll() {
            Iterator<Map.Entry<String, TimerTask>> it = this.tasks.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().cancel();
                it.remove();
            }
        }

        public int size() {
            return this.tasks.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$HighWatermarkListener.class */
    public class HighWatermarkListener implements PartitionWriter.Listener {
        private static final long NO_OFFSET = -1;
        private final AtomicLong lastHighWatermark = new AtomicLong(NO_OFFSET);

        HighWatermarkListener() {
        }

        public long lastHighWatermark() {
            return this.lastHighWatermark.get();
        }

        @Override // org.apache.kafka.coordinator.group.runtime.PartitionWriter.Listener
        public void onHighWatermarkUpdated(TopicPartition topicPartition, long j) {
            CoordinatorRuntime.this.log.debug("High watermark of {} incremented to {}.", topicPartition, Long.valueOf(j));
            if (this.lastHighWatermark.getAndSet(j) == NO_OFFSET) {
                CoordinatorRuntime.this.enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", topicPartition, () -> {
                    long andSet = this.lastHighWatermark.getAndSet(NO_OFFSET);
                    CoordinatorContext coordinatorContext = (CoordinatorContext) CoordinatorRuntime.this.coordinators.get(topicPartition);
                    if (coordinatorContext == null) {
                        CoordinatorRuntime.this.log.debug("Ignored high watermark updated for {} to {} because the coordinator does not exist.", topicPartition, Long.valueOf(andSet));
                        return;
                    }
                    coordinatorContext.lock.lock();
                    try {
                        if (coordinatorContext.state == CoordinatorState.ACTIVE) {
                            CoordinatorRuntime.this.log.debug("Updating high watermark of {} to {}.", topicPartition, Long.valueOf(andSet));
                            coordinatorContext.coordinator.updateLastCommittedOffset(Long.valueOf(andSet));
                            coordinatorContext.deferredEventQueue.completeUpTo(andSet);
                            CoordinatorRuntime.this.coordinatorMetrics.onUpdateLastCommittedOffset(topicPartition, andSet);
                        } else {
                            CoordinatorRuntime.this.log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", topicPartition, Long.valueOf(andSet));
                        }
                    } finally {
                        coordinatorContext.lock.unlock();
                    }
                }));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime$OperationTimeout.class */
    public class OperationTimeout extends TimerTask {
        private final TopicPartition tp;
        private final DeferredEvent event;

        public OperationTimeout(TopicPartition topicPartition, DeferredEvent deferredEvent, long j) {
            super(j);
            this.event = deferredEvent;
            this.tp = topicPartition;
        }

        public void run() {
            String obj = this.event.toString();
            CoordinatorRuntime.this.scheduleInternalOperation("OperationTimeout(name=" + obj + ", tp=" + this.tp + ")", this.tp, () -> {
                this.event.complete(new TimeoutException(obj + " timed out after " + this.delayMs + "ms"));
            });
        }
    }

    private CoordinatorRuntime(String str, LogContext logContext, CoordinatorEventProcessor coordinatorEventProcessor, PartitionWriter partitionWriter, CoordinatorLoader<U> coordinatorLoader, CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier, Time time, Timer timer, Duration duration, CoordinatorRuntimeMetrics coordinatorRuntimeMetrics, CoordinatorMetrics coordinatorMetrics, Serializer<U> serializer, Compression compression, int i) {
        this.isRunning = new AtomicBoolean(true);
        this.metadataImage = MetadataImage.EMPTY;
        this.logPrefix = str;
        this.logContext = logContext;
        this.log = logContext.logger(CoordinatorRuntime.class);
        this.time = time;
        this.timer = timer;
        this.defaultWriteTimeout = duration;
        this.coordinators = new ConcurrentHashMap<>();
        this.processor = coordinatorEventProcessor;
        this.partitionWriter = partitionWriter;
        this.loader = coordinatorLoader;
        this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
        this.runtimeMetrics = coordinatorRuntimeMetrics;
        this.coordinatorMetrics = coordinatorMetrics;
        this.serializer = serializer;
        this.compression = compression;
        this.appendLingerMs = i;
    }

    private void throwIfNotRunning() {
        if (!this.isRunning.get()) {
            throw Errors.NOT_COORDINATOR.exception();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueLast(CoordinatorEvent coordinatorEvent) {
        try {
            this.processor.enqueueLast(coordinatorEvent);
        } catch (RejectedExecutionException e) {
            throw new NotCoordinatorException("Can't accept an event because the processor is closed", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueFirst(CoordinatorEvent coordinatorEvent) {
        try {
            this.processor.enqueueFirst(coordinatorEvent);
        } catch (RejectedExecutionException e) {
            throw new NotCoordinatorException("Can't accept an event because the processor is closed", e);
        }
    }

    CoordinatorRuntime<S, U>.CoordinatorContext maybeCreateContext(TopicPartition topicPartition) {
        return this.coordinators.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new CoordinatorContext(topicPartition2);
        });
    }

    CoordinatorRuntime<S, U>.CoordinatorContext contextOrThrow(TopicPartition topicPartition) throws NotCoordinatorException {
        CoordinatorRuntime<S, U>.CoordinatorContext coordinatorContext = this.coordinators.get(topicPartition);
        if (coordinatorContext == null) {
            throw Errors.NOT_COORDINATOR.exception();
        }
        return coordinatorContext;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void withActiveContextOrThrow(TopicPartition topicPartition, Consumer<CoordinatorRuntime<S, U>.CoordinatorContext> consumer) throws NotCoordinatorException, CoordinatorLoadInProgressException {
        CoordinatorRuntime<S, U>.CoordinatorContext contextOrThrow = contextOrThrow(topicPartition);
        try {
            contextOrThrow.lock.lock();
            if (contextOrThrow.state == CoordinatorState.ACTIVE) {
                consumer.accept(contextOrThrow);
            } else {
                if (contextOrThrow.state != CoordinatorState.LOADING) {
                    throw Errors.NOT_COORDINATOR.exception();
                }
                throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception();
            }
        } finally {
            contextOrThrow.lock.unlock();
        }
    }

    public <T> CompletableFuture<T> scheduleWriteOperation(String str, TopicPartition topicPartition, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of write operation {}.", str);
        CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(this, str, topicPartition, duration, coordinatorWriteOperation);
        enqueueLast(coordinatorWriteEvent);
        return coordinatorWriteEvent.future;
    }

    public <T> List<CompletableFuture<T>> scheduleWriteAllOperation(String str, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of write all operation {}.", str);
        return (List) this.coordinators.keySet().stream().map(topicPartition -> {
            return scheduleWriteOperation(str, topicPartition, duration, coordinatorWriteOperation);
        }).collect(Collectors.toList());
    }

    public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(String str, TopicPartition topicPartition, String str2, long j, short s, Duration duration, CoordinatorWriteOperation<S, T, U> coordinatorWriteOperation, Short sh) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of transactional write operation {}.", str);
        return this.partitionWriter.maybeStartTransactionVerification(topicPartition, str2, j, s, sh.shortValue()).thenCompose(verificationGuard -> {
            CoordinatorWriteEvent coordinatorWriteEvent = new CoordinatorWriteEvent(str, topicPartition, str2, j, s, verificationGuard, duration, coordinatorWriteOperation);
            enqueueLast(coordinatorWriteEvent);
            return coordinatorWriteEvent.future;
        });
    }

    public CompletableFuture<Void> scheduleTransactionCompletion(String str, TopicPartition topicPartition, long j, short s, int i, TransactionResult transactionResult, Duration duration) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of transaction completion for {} with producer id={}, producer epoch={}, coordinator epoch={} and transaction result={}.", new Object[]{topicPartition, Long.valueOf(j), Short.valueOf(s), Integer.valueOf(i), transactionResult});
        CoordinatorCompleteTransactionEvent coordinatorCompleteTransactionEvent = new CoordinatorCompleteTransactionEvent(str, topicPartition, j, s, i, transactionResult, duration);
        enqueueLast(coordinatorCompleteTransactionEvent);
        return coordinatorCompleteTransactionEvent.future;
    }

    public <T> CompletableFuture<T> scheduleReadOperation(String str, TopicPartition topicPartition, CoordinatorReadOperation<S, T> coordinatorReadOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of read operation {}.", str);
        CoordinatorReadEvent coordinatorReadEvent = new CoordinatorReadEvent(str, topicPartition, coordinatorReadOperation);
        enqueueLast(coordinatorReadEvent);
        return coordinatorReadEvent.future;
    }

    public <T> List<CompletableFuture<T>> scheduleReadAllOperation(String str, CoordinatorReadOperation<S, T> coordinatorReadOperation) {
        throwIfNotRunning();
        this.log.debug("Scheduled execution of read all operation {}.", str);
        return (List) this.coordinators.keySet().stream().map(topicPartition -> {
            return scheduleReadOperation(str, topicPartition, coordinatorReadOperation);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleInternalOperation(String str, TopicPartition topicPartition, Runnable runnable) {
        this.log.debug("Scheduled execution of internal operation {}.", str);
        enqueueLast(new CoordinatorInternalEvent(str, topicPartition, runnable));
    }

    public void scheduleLoadOperation(TopicPartition topicPartition, int i) {
        throwIfNotRunning();
        this.log.info("Scheduling loading of metadata from {} with epoch {}", topicPartition, Integer.valueOf(i));
        maybeCreateContext(topicPartition);
        scheduleInternalOperation("Load(tp=" + topicPartition + ", epoch=" + i + ")", topicPartition, () -> {
            CoordinatorRuntime<S, U>.CoordinatorContext maybeCreateContext = maybeCreateContext(topicPartition);
            maybeCreateContext.lock.lock();
            try {
                if (maybeCreateContext.epoch < i) {
                    maybeCreateContext.epoch = i;
                    switch (maybeCreateContext.state) {
                        case LOADING:
                            this.log.info("The coordinator {} is already loading metadata.", topicPartition);
                            break;
                        case ACTIVE:
                            this.log.info("The coordinator {} is already active.", topicPartition);
                            break;
                        case FAILED:
                        case INITIAL:
                            maybeCreateContext.transitionTo(CoordinatorState.LOADING);
                            break;
                        case CLOSED:
                        default:
                            this.log.error("Cannot load coordinator {} in state {}.", topicPartition, maybeCreateContext.state);
                            break;
                    }
                } else {
                    this.log.info("Ignored loading metadata from {} since current epoch {} is larger than or equals to {}.", new Object[]{maybeCreateContext.tp, Integer.valueOf(maybeCreateContext.epoch), Integer.valueOf(i)});
                }
            } finally {
                maybeCreateContext.lock.unlock();
            }
        });
    }

    public void scheduleUnloadOperation(TopicPartition topicPartition, OptionalInt optionalInt) {
        throwIfNotRunning();
        this.log.info("Scheduling unloading of metadata for {} with epoch {}", topicPartition, optionalInt);
        scheduleInternalOperation("UnloadCoordinator(tp=" + topicPartition + ", epoch=" + optionalInt + ")", topicPartition, () -> {
            CoordinatorRuntime<S, U>.CoordinatorContext coordinatorContext = this.coordinators.get(topicPartition);
            if (coordinatorContext == null) {
                this.log.info("Ignored unloading metadata for {} in epoch {} since metadata was never loaded.", topicPartition, optionalInt);
                return;
            }
            coordinatorContext.lock.lock();
            try {
                if (!optionalInt.isPresent() || coordinatorContext.epoch < optionalInt.getAsInt()) {
                    this.log.info("Started unloading metadata for {} with epoch {}.", topicPartition, optionalInt);
                    coordinatorContext.transitionTo(CoordinatorState.CLOSED);
                    this.coordinators.remove(topicPartition, coordinatorContext);
                    this.log.info("Finished unloading metadata for {} with epoch {}.", topicPartition, optionalInt);
                } else {
                    this.log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", new Object[]{topicPartition, optionalInt, Integer.valueOf(coordinatorContext.epoch)});
                }
            } finally {
                coordinatorContext.lock.unlock();
            }
        });
    }

    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        throwIfNotRunning();
        this.log.debug("Scheduling applying of a new metadata image with offset {}.", Long.valueOf(metadataImage.offset()));
        this.metadataImage = metadataImage;
        this.coordinators.keySet().forEach(topicPartition -> {
            scheduleInternalOperation("UpdateImage(tp=" + topicPartition + ", offset=" + metadataImage.offset() + ")", topicPartition, () -> {
                CoordinatorRuntime<S, U>.CoordinatorContext coordinatorContext = this.coordinators.get(topicPartition);
                if (coordinatorContext == null) {
                    this.log.debug("Ignored new metadata image with offset {} for {} because the coordinator does not exist.", Long.valueOf(metadataImage.offset()), topicPartition);
                    return;
                }
                coordinatorContext.lock.lock();
                try {
                    if (coordinatorContext.state == CoordinatorState.ACTIVE) {
                        this.log.debug("Applying new metadata image with offset {} to {}.", Long.valueOf(metadataImage.offset()), topicPartition);
                        coordinatorContext.coordinator.onNewMetadataImage(metadataImage, metadataDelta);
                    } else {
                        this.log.debug("Ignored new metadata image with offset {} for {} because the coordinator is not active.", Long.valueOf(metadataImage.offset()), topicPartition);
                    }
                } finally {
                    coordinatorContext.lock.unlock();
                }
            });
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.isRunning.compareAndSet(true, false)) {
            this.log.warn("Coordinator runtime is already shutting down.");
            return;
        }
        this.log.info("Closing coordinator runtime.");
        Utils.closeQuietly(this.loader, "loader");
        Utils.closeQuietly(this.timer, "timer");
        Utils.closeQuietly(this.processor, "event processor");
        this.coordinators.forEach((topicPartition, coordinatorContext) -> {
            coordinatorContext.lock.lock();
            try {
                coordinatorContext.transitionTo(CoordinatorState.CLOSED);
            } finally {
                coordinatorContext.lock.unlock();
            }
        });
        this.coordinators.clear();
        Utils.closeQuietly(this.runtimeMetrics, "runtime metrics");
        this.log.info("Coordinator runtime closed.");
    }
}
