package io.pravega.segmentstore.server.logs;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.ExceptionHelpers;
import io.pravega.common.MathHelpers;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.common.function.CallbackHelpers;
import io.pravega.common.util.BlockingDrainingQueue;
import io.pravega.common.util.SortedDeque;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.UpdateableContainerMetadata;
import io.pravega.segmentstore.server.logs.DataFrameBuilder;
import io.pravega.segmentstore.server.logs.operations.CompletableOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.storage.DataLogWriterNotPrimaryException;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.QueueStats;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/logs/OperationProcessor.class */
public class OperationProcessor extends AbstractThreadPoolService implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private static final Duration SHUTDOWN_TIMEOUT;
    private static final int MAX_READ_AT_ONCE = 1000;
    private final UpdateableContainerMetadata metadata;

    @GuardedBy("stateLock")
    private final OperationMetadataUpdater metadataUpdater;
    private final DurableDataLog durableDataLog;
    private final BlockingDrainingQueue<CompletableOperation> operationQueue;
    private final Object stateLock;
    private final QueueProcessingState state;

    @GuardedBy("stateLock")
    private final DataFrameBuilder<Operation> dataFrameBuilder;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/logs/OperationProcessor$QueueProcessingState.class */
    public class QueueProcessingState {

        @GuardedBy("stateLock")
        private final Deque<CompletableOperation> pendingOperations;
        private final MemoryStateUpdater logUpdater;
        private final MetadataCheckpointPolicy checkpointPolicy;

        @GuardedBy("stateLock")
        private final SortedDeque<DataFrameBuilder.CommitArgs> metadataTransactions;

        @GuardedBy("stateLock")
        private long highestCommittedDataFrame;
        static final /* synthetic */ boolean $assertionsDisabled;

        private QueueProcessingState(MemoryStateUpdater memoryStateUpdater, MetadataCheckpointPolicy metadataCheckpointPolicy) {
            this.logUpdater = (MemoryStateUpdater) Preconditions.checkNotNull(memoryStateUpdater, "stateUpdater");
            this.checkpointPolicy = (MetadataCheckpointPolicy) Preconditions.checkNotNull(metadataCheckpointPolicy, "checkpointPolicy");
            this.pendingOperations = new ArrayDeque();
            this.metadataTransactions = new SortedDeque<>();
            this.highestCommittedDataFrame = -1L;
        }

        void addPending(CompletableOperation completableOperation) {
            synchronized (OperationProcessor.this.stateLock) {
                this.pendingOperations.add(completableOperation);
            }
            autoCompleteIfNeeded();
        }

        void frameSealed(DataFrameBuilder.CommitArgs commitArgs) {
            synchronized (OperationProcessor.this.stateLock) {
                commitArgs.setIndexKey(OperationProcessor.this.metadataUpdater.sealTransaction());
                this.metadataTransactions.addLast(commitArgs);
            }
        }

        /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
            java.lang.NullPointerException: Cannot invoke "String.charAt(int)" because "obj" is null
            	at jadx.core.utils.Utils.cleanObjectName(Utils.java:38)
            	at jadx.core.dex.instructions.args.ArgType.object(ArgType.java:86)
            	at jadx.core.dex.info.ClassInfo.fromName(ClassInfo.java:42)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.convertToHandlers(AttachTryCatchVisitor.java:113)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.initTryCatches(AttachTryCatchVisitor.java:54)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.visit(AttachTryCatchVisitor.java:42)
            */
        void commit(io.pravega.segmentstore.server.logs.DataFrameBuilder.CommitArgs r9) {
            /*
                Method dump skipped, instructions count: 735
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: io.pravega.segmentstore.server.logs.OperationProcessor.QueueProcessingState.commit(io.pravega.segmentstore.server.logs.DataFrameBuilder$CommitArgs):void");
        }

        void fail(Throwable th, DataFrameBuilder.CommitArgs commitArgs) {
            HashMap hashMap = new HashMap();
            try {
                synchronized (OperationProcessor.this.stateLock) {
                    collectFailureCandidates(th, commitArgs, hashMap);
                }
                OperationProcessor operationProcessor = OperationProcessor.this;
                CallbackHelpers.invokeSafely(operationProcessor::errorHandler, th, (Consumer) null);
            } finally {
                hashMap.forEach(this::failOperation);
                autoCompleteIfNeeded();
            }
        }

        void failOperation(CompletableOperation completableOperation, Throwable th) {
            synchronized (OperationProcessor.this.stateLock) {
                Throwable stopException = OperationProcessor.this.getStopException();
                if (stopException != null) {
                    th = stopException;
                }
            }
            completableOperation.fail(th);
        }

        @GuardedBy("stateLock")
        private void collectFailureCandidates(Throwable th, DataFrameBuilder.CommitArgs commitArgs, Map<CompletableOperation, Throwable> map) {
            long j = 0;
            if (commitArgs != null && this.metadataTransactions.removeGreaterThanOrEqual(commitArgs)) {
                j = commitArgs.key();
            }
            if (j == 0) {
                this.metadataTransactions.clear();
            }
            OperationProcessor.this.metadataUpdater.rollback(j);
            long lastFullySerializedSequenceNumber = this.metadataTransactions.isEmpty() ? 0L : ((DataFrameBuilder.CommitArgs) this.metadataTransactions.peekLast()).getLastFullySerializedSequenceNumber();
            while (!this.pendingOperations.isEmpty() && shouldFail(this.pendingOperations.peekLast(), lastFullySerializedSequenceNumber)) {
                map.put(this.pendingOperations.pollLast(), th);
            }
        }

        private boolean shouldFail(CompletableOperation completableOperation, long j) {
            return !completableOperation.getOperation().canSerialize() || completableOperation.getOperation().getSequenceNumber() > j;
        }

        private void autoCompleteIfNeeded() {
            ArrayList arrayList = null;
            synchronized (OperationProcessor.this.stateLock) {
                while (!this.pendingOperations.isEmpty() && !this.pendingOperations.peekFirst().getOperation().canSerialize()) {
                    if (arrayList == null) {
                        arrayList = new ArrayList();
                    }
                    arrayList.add(this.pendingOperations.pollFirst());
                }
            }
            if (arrayList != null) {
                arrayList.forEach((v0) -> {
                    v0.complete();
                });
            }
        }

        static {
            $assertionsDisabled = !OperationProcessor.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperationProcessor(UpdateableContainerMetadata updateableContainerMetadata, MemoryStateUpdater memoryStateUpdater, DurableDataLog durableDataLog, MetadataCheckpointPolicy metadataCheckpointPolicy, ScheduledExecutorService scheduledExecutorService) {
        super(String.format("OperationProcessor[%d]", Integer.valueOf(updateableContainerMetadata.getContainerId())), scheduledExecutorService);
        this.stateLock = new Object();
        this.metadata = updateableContainerMetadata;
        this.metadataUpdater = new OperationMetadataUpdater(this.metadata);
        this.durableDataLog = (DurableDataLog) Preconditions.checkNotNull(durableDataLog, "durableDataLog");
        this.operationQueue = new BlockingDrainingQueue<>();
        this.state = new QueueProcessingState(memoryStateUpdater, metadataCheckpointPolicy);
        QueueProcessingState queueProcessingState = this.state;
        queueProcessingState.getClass();
        Consumer consumer = queueProcessingState::frameSealed;
        QueueProcessingState queueProcessingState2 = this.state;
        queueProcessingState2.getClass();
        Consumer consumer2 = queueProcessingState2::commit;
        QueueProcessingState queueProcessingState3 = this.state;
        queueProcessingState3.getClass();
        this.dataFrameBuilder = new DataFrameBuilder<>(this.durableDataLog, new DataFrameBuilder.Args(consumer, consumer2, queueProcessingState3::fail, this.executor));
    }

    protected Duration getShutdownTimeout() {
        return SHUTDOWN_TIMEOUT;
    }

    protected CompletableFuture<Void> doRun() {
        return FutureHelpers.loop(this::isRunning, () -> {
            return delayIfNecessary().thenComposeAsync(r4 -> {
                return this.operationQueue.take(MAX_READ_AT_ONCE);
            }, (Executor) this.executor).thenAcceptAsync((Consumer<? super U>) this::processOperations, (Executor) this.executor);
        }, this.executor).exceptionally(this::iterationErrorHandler);
    }

    protected void doStop() {
        CancellationException cancellationException = new CancellationException("OperationProcessor is shutting down.");
        closeQueue(cancellationException);
        synchronized (this.stateLock) {
            this.dataFrameBuilder.close();
        }
        this.state.fail(cancellationException, null);
        super.doStop();
    }

    protected void errorHandler(Throwable th) {
        Throwable realException = ExceptionHelpers.getRealException(th);
        closeQueue(realException);
        if (isShutdownException(realException)) {
            return;
        }
        super.errorHandler(realException);
        stopAsync();
    }

    private Void iterationErrorHandler(Throwable th) {
        Throwable realException = ExceptionHelpers.getRealException(th);
        Service.State state = state();
        if (isShutdownException(realException) && (state == Service.State.STOPPING || state == Service.State.TERMINATED || state == Service.State.FAILED)) {
            return null;
        }
        throw realException;
    }

    private boolean isShutdownException(Throwable th) {
        return (th instanceof ObjectClosedException) || (th instanceof CancellationException);
    }

    public CompletableFuture<Void> process(Operation operation) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isRunning()) {
            log.debug("{}: process {}.", this.traceObjectId, operation);
            try {
                this.operationQueue.add(new CompletableOperation(operation, completableFuture));
            } catch (Throwable th) {
                if (ExceptionHelpers.mustRethrow(th)) {
                    throw th;
                }
                completableFuture.completeExceptionally(th);
            }
        } else {
            completableFuture.completeExceptionally(new IllegalContainerStateException("OperationProcessor is not running."));
        }
        return completableFuture;
    }

    private CompletableFuture<Void> delayIfNecessary() {
        QueueStats queueStatistics = this.durableDataLog.getQueueStatistics();
        return FutureHelpers.delayedFuture(Duration.ofMillis(Math.min((int) (queueStatistics.getExpectedProcessingTimeMillis() * MathHelpers.minMax(1.0d - queueStatistics.getAverageItemFillRate(), 0.0d, 1.0d) * (queueStatistics.getSize() < queueStatistics.getMaxParallelism() ? 0.1d : 1.0d)), MAX_READ_AT_ONCE)), this.executor);
    }

    private void processOperations(Queue<CompletableOperation> queue) {
        Throwable realException;
        boolean isFatalException;
        Throwable realException2;
        boolean isFatalException2;
        log.debug("{}: processOperations (OperationCount = {}).", this.traceObjectId, Integer.valueOf(queue.size()));
        while (!queue.isEmpty()) {
            while (!queue.isEmpty()) {
                try {
                    CompletableOperation poll = queue.poll();
                    try {
                        processOperation(poll);
                        this.state.addPending(poll);
                    } finally {
                        if (isFatalException2) {
                        }
                    }
                } finally {
                    if (isFatalException) {
                    }
                }
            }
            if (queue.isEmpty()) {
                queue = this.operationQueue.poll(MAX_READ_AT_ONCE);
                if (queue.isEmpty()) {
                    log.debug("{}: processOperations (Flush).", this.traceObjectId);
                    synchronized (this.stateLock) {
                        this.dataFrameBuilder.flush();
                    }
                } else {
                    log.debug("{}: processOperations (Add OperationCount = {}).", this.traceObjectId, Integer.valueOf(queue.size()));
                }
            }
        }
    }

    private void processOperation(CompletableOperation completableOperation) throws Exception {
        Preconditions.checkState(!completableOperation.isDone(), "The Operation has already been processed.");
        Operation operation = completableOperation.getOperation();
        if (operation.canSerialize()) {
            synchronized (this.stateLock) {
                this.metadataUpdater.preProcessOperation(operation);
                operation.setSequenceNumber(this.metadataUpdater.nextOperationSequenceNumber());
                this.dataFrameBuilder.append(completableOperation.getOperation());
                this.metadataUpdater.acceptOperation(operation);
            }
            log.trace("{}: DataFrameBuilder.Append {}.", this.traceObjectId, completableOperation.getOperation());
        }
    }

    private void closeQueue(Throwable th) {
        Collection close;
        BlockingDrainingQueue<CompletableOperation> blockingDrainingQueue = this.operationQueue;
        if (blockingDrainingQueue == null || (close = blockingDrainingQueue.close()) == null || close.size() <= 0) {
            return;
        }
        cancelIncompleteOperations(close, th != null ? th : new CancellationException());
    }

    private void cancelIncompleteOperations(Iterable<CompletableOperation> iterable, Throwable th) {
        if (!$assertionsDisabled && th == null) {
            throw new AssertionError("no exception to set");
        }
        int i = 0;
        for (CompletableOperation completableOperation : iterable) {
            if (!completableOperation.isDone()) {
                this.state.failOperation(completableOperation, th);
                i++;
            }
        }
        log.warn("{}: Cancelling {} operations with exception: {}.", new Object[]{this.traceObjectId, Integer.valueOf(i), th.toString()});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isFatalException(Throwable th) {
        return (th instanceof DataCorruptionException) || (th instanceof DataLogWriterNotPrimaryException) || (th instanceof ObjectClosedException);
    }

    static /* synthetic */ Object access$100(OperationProcessor operationProcessor) {
        return operationProcessor.stateLock;
    }

    static /* synthetic */ OperationMetadataUpdater access$200(OperationProcessor operationProcessor) {
        return operationProcessor.metadataUpdater;
    }

    static /* synthetic */ String access$300(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ Logger access$400() {
        return log;
    }

    static /* synthetic */ UpdateableContainerMetadata access$500(OperationProcessor operationProcessor) {
        return operationProcessor.metadata;
    }

    static /* synthetic */ String access$600(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ String access$700(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ String access$800(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ boolean access$900(Throwable th) {
        return isFatalException(th);
    }

    static /* synthetic */ Throwable access$1000(OperationProcessor operationProcessor) {
        return operationProcessor.getStopException();
    }

    static {
        $assertionsDisabled = !OperationProcessor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(OperationProcessor.class);
        SHUTDOWN_TIMEOUT = Duration.ofSeconds(10L);
    }
}
