package io.pravega.segmentstore.server.logs;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.common.util.BlockingDrainingQueue;
import io.pravega.segmentstore.server.CacheUtilizationProvider;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.SegmentStoreMetrics;
import io.pravega.segmentstore.server.UpdateableContainerMetadata;
import io.pravega.segmentstore.server.logs.DataFrameBuilder;
import io.pravega.segmentstore.server.logs.ThrottlerCalculator;
import io.pravega.segmentstore.server.logs.operations.CompletableOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.OperationSerializer;
import io.pravega.segmentstore.storage.DataLogWriterNotPrimaryException;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.WriteSettings;
import io.pravega.segmentstore.storage.cache.CacheFullException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/logs/OperationProcessor.class */
public class OperationProcessor extends AbstractThreadPoolService implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private static final Duration SHUTDOWN_TIMEOUT;
    private static final int MAX_READ_AT_ONCE = 1000;
    private static final int MAX_COMMIT_QUEUE_SIZE = 50;
    private final UpdateableContainerMetadata metadata;
    private final MemoryStateUpdater stateUpdater;

    @GuardedBy("stateLock")
    private final OperationMetadataUpdater metadataUpdater;
    private final BlockingDrainingQueue<CompletableOperation> operationQueue;
    private final BlockingDrainingQueue<List<CompletableOperation>> commitQueue;
    private final Object stateLock;
    private final QueueProcessingState state;

    @GuardedBy("stateLock")
    private final DataFrameBuilder<Operation> dataFrameBuilder;
    private final SegmentStoreMetrics.OperationProcessor metrics;
    private final Throttler throttler;
    private final CacheUtilizationProvider cacheUtilizationProvider;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/logs/OperationProcessor$QueueProcessingState.class */
    public class QueueProcessingState {

        @GuardedBy("stateLock")
        private ArrayList<CompletableOperation> nextFrameOperations;

        @GuardedBy("stateLock")
        private int pendingOperationCount;
        private final MetadataCheckpointPolicy checkpointPolicy;

        @GuardedBy("stateLock")
        private final ArrayDeque<DataFrameBuilder.CommitArgs> metadataTransactions;

        @GuardedBy("stateLock")
        private long highestCommittedDataFrame;
        static final /* synthetic */ boolean $assertionsDisabled;

        private QueueProcessingState(MetadataCheckpointPolicy metadataCheckpointPolicy) {
            this.checkpointPolicy = (MetadataCheckpointPolicy) Preconditions.checkNotNull(metadataCheckpointPolicy, "checkpointPolicy");
            this.nextFrameOperations = new ArrayList<>();
            this.metadataTransactions = new ArrayDeque<>();
            this.highestCommittedDataFrame = -1L;
            this.pendingOperationCount = 0;
        }

        void addPending(CompletableOperation completableOperation) {
            OperationProcessor.this.cacheUtilizationProvider.adjustPendingBytes(completableOperation.getOperation().getCacheLength());
            synchronized (OperationProcessor.this.stateLock) {
                this.nextFrameOperations.add(completableOperation);
                this.pendingOperationCount++;
            }
        }

        void notifyOperationCommitted(CompletableOperation completableOperation) {
            OperationProcessor.this.cacheUtilizationProvider.adjustPendingBytes(-completableOperation.getOperation().getCacheLength());
        }

        void notifyOperationCommitted(Operation operation) {
            OperationProcessor.this.cacheUtilizationProvider.adjustPendingBytes(-operation.getCacheLength());
        }

        int getPendingCount() {
            int i;
            synchronized (OperationProcessor.this.stateLock) {
                i = this.pendingOperationCount;
            }
            return i;
        }

        void frameSealed(DataFrameBuilder.CommitArgs commitArgs) {
            synchronized (OperationProcessor.this.stateLock) {
                commitArgs.setMetadataTransactionId(OperationProcessor.this.metadataUpdater.sealTransaction());
                commitArgs.setOperations(Collections.unmodifiableList(this.nextFrameOperations));
                this.nextFrameOperations = new ArrayList<>();
                this.metadataTransactions.addLast(commitArgs);
            }
        }

        /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
            java.lang.NullPointerException: Cannot invoke "String.charAt(int)" because "obj" is null
            	at jadx.core.utils.Utils.cleanObjectName(Utils.java:38)
            	at jadx.core.dex.instructions.args.ArgType.object(ArgType.java:86)
            	at jadx.core.dex.info.ClassInfo.fromName(ClassInfo.java:42)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.convertToHandlers(AttachTryCatchVisitor.java:113)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.initTryCatches(AttachTryCatchVisitor.java:54)
            	at jadx.core.dex.visitors.AttachTryCatchVisitor.visit(AttachTryCatchVisitor.java:42)
            */
        void commit(io.pravega.segmentstore.server.logs.DataFrameBuilder.CommitArgs r9) {
            /*
                Method dump skipped, instructions count: 497
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: io.pravega.segmentstore.server.logs.OperationProcessor.QueueProcessingState.commit(io.pravega.segmentstore.server.logs.DataFrameBuilder$CommitArgs):void");
        }

        void fail(Throwable th, DataFrameBuilder.CommitArgs commitArgs) {
            List<CompletableOperation> list = null;
            try {
                synchronized (OperationProcessor.this.stateLock) {
                    list = collectFailureCandidates(commitArgs);
                    this.pendingOperationCount -= list.size();
                }
                if (list != null) {
                    list.forEach(completableOperation -> {
                        failOperation(completableOperation, th);
                        notifyOperationCommitted(completableOperation);
                    });
                    OperationProcessor.this.metrics.operationsFailed(list);
                }
                OperationProcessor operationProcessor = OperationProcessor.this;
                Callbacks.invokeSafely(operationProcessor::errorHandler, th, (Consumer) null);
            } catch (Throwable th2) {
                if (list != null) {
                    list.forEach(completableOperation2 -> {
                        failOperation(completableOperation2, th);
                        notifyOperationCommitted(completableOperation2);
                    });
                    OperationProcessor.this.metrics.operationsFailed(list);
                }
                throw th2;
            }
        }

        void failOperation(CompletableOperation completableOperation, Throwable th) {
            synchronized (OperationProcessor.this.stateLock) {
                Throwable stopException = OperationProcessor.this.getStopException();
                if (stopException != null) {
                    th = stopException;
                }
            }
            completableOperation.fail(th);
        }

        @GuardedBy("stateLock")
        private List<List<CompletableOperation>> collectCompletionCandidates(DataFrameBuilder.CommitArgs commitArgs) {
            ArrayList arrayList = new ArrayList();
            long metadataTransactionId = commitArgs.getMetadataTransactionId();
            boolean z = false;
            while (!this.metadataTransactions.isEmpty() && this.metadataTransactions.peekFirst().getMetadataTransactionId() <= metadataTransactionId) {
                DataFrameBuilder.CommitArgs pollFirst = this.metadataTransactions.pollFirst();
                z |= pollFirst.getMetadataTransactionId() == metadataTransactionId;
                if (pollFirst.getOperations().size() > 0) {
                    arrayList.add(pollFirst.getOperations());
                    this.pendingOperationCount -= pollFirst.getOperations().size();
                }
            }
            if (!z) {
                OperationProcessor.log.warn("{}: No Metadata UpdateTransaction found for '{}' (Count={}). This is expected after a critical failure or when OperationProcessor is shutting down.", new Object[]{OperationProcessor.this.traceObjectId, Integer.valueOf(this.metadataTransactions.size()), commitArgs});
                if (!$assertionsDisabled && !this.metadataTransactions.isEmpty()) {
                    throw new AssertionError("No Metadata UpdateTransaction found for given CommitArgs, but there are still entries in metadataTransaction.");
                }
            }
            return arrayList;
        }

        @GuardedBy("stateLock")
        private List<CompletableOperation> collectFailureCandidates(DataFrameBuilder.CommitArgs commitArgs) {
            ArrayList arrayList = new ArrayList();
            if (commitArgs != null) {
                OperationProcessor.this.metadataUpdater.rollback(commitArgs.getMetadataTransactionId());
                while (!this.metadataTransactions.isEmpty() && this.metadataTransactions.peekLast().getMetadataTransactionId() >= commitArgs.getMetadataTransactionId()) {
                    arrayList.addAll(this.metadataTransactions.pollLast().getOperations());
                }
            } else {
                this.metadataTransactions.forEach(commitArgs2 -> {
                    arrayList.addAll(commitArgs2.getOperations());
                });
                this.metadataTransactions.clear();
                OperationProcessor.this.metadataUpdater.rollback(0L);
            }
            arrayList.addAll(this.nextFrameOperations);
            this.nextFrameOperations.clear();
            return arrayList;
        }

        static {
            $assertionsDisabled = !OperationProcessor.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperationProcessor(UpdateableContainerMetadata updateableContainerMetadata, MemoryStateUpdater memoryStateUpdater, DurableDataLog durableDataLog, MetadataCheckpointPolicy metadataCheckpointPolicy, ScheduledExecutorService scheduledExecutorService) {
        super(String.format("OperationProcessor[%d]", Integer.valueOf(updateableContainerMetadata.getContainerId())), scheduledExecutorService);
        this.stateLock = new Object();
        Preconditions.checkNotNull(durableDataLog, "durableDataLog");
        this.metadata = updateableContainerMetadata;
        this.stateUpdater = (MemoryStateUpdater) Preconditions.checkNotNull(memoryStateUpdater, "stateUpdater");
        this.metadataUpdater = new OperationMetadataUpdater(this.metadata);
        this.operationQueue = new BlockingDrainingQueue<>();
        this.commitQueue = new BlockingDrainingQueue<>();
        this.state = new QueueProcessingState(metadataCheckpointPolicy);
        QueueProcessingState queueProcessingState = this.state;
        queueProcessingState.getClass();
        Consumer consumer = queueProcessingState::frameSealed;
        QueueProcessingState queueProcessingState2 = this.state;
        queueProcessingState2.getClass();
        Consumer consumer2 = queueProcessingState2::commit;
        QueueProcessingState queueProcessingState3 = this.state;
        queueProcessingState3.getClass();
        this.dataFrameBuilder = new DataFrameBuilder<>(durableDataLog, OperationSerializer.DEFAULT, new DataFrameBuilder.Args(consumer, consumer2, queueProcessingState3::fail, this.executor));
        this.metrics = new SegmentStoreMetrics.OperationProcessor(this.metadata.getContainerId());
        this.cacheUtilizationProvider = memoryStateUpdater.getCacheUtilizationProvider();
        ThrottlerCalculator.ThrottlerCalculatorBuilder builder = ThrottlerCalculator.builder();
        CacheUtilizationProvider cacheUtilizationProvider = this.cacheUtilizationProvider;
        cacheUtilizationProvider.getClass();
        ThrottlerCalculator.ThrottlerCalculatorBuilder cacheThrottler = builder.cacheThrottler(cacheUtilizationProvider::getCacheUtilization, this.cacheUtilizationProvider.getCacheTargetUtilization(), this.cacheUtilizationProvider.getCacheMaxUtilization());
        durableDataLog.getClass();
        ThrottlerCalculator.ThrottlerCalculatorBuilder batchingThrottler = cacheThrottler.batchingThrottler(durableDataLog::getQueueStatistics);
        WriteSettings writeSettings = durableDataLog.getWriteSettings();
        durableDataLog.getClass();
        this.throttler = new Throttler(this.metadata.getContainerId(), batchingThrottler.durableDataLogThrottler(writeSettings, durableDataLog::getQueueStatistics).build(), scheduledExecutorService, this.metrics);
        this.cacheUtilizationProvider.registerCleanupListener(this.throttler);
        durableDataLog.registerQueueStateChangeListener(this.throttler);
    }

    protected Duration getShutdownTimeout() {
        return SHUTDOWN_TIMEOUT;
    }

    protected CompletableFuture<Void> doRun() {
        return CompletableFuture.allOf(Futures.loop(this::isRunning, () -> {
            return this.throttler.throttle().thenComposeAsync(r4 -> {
                return this.operationQueue.take(getFetchCount());
            }, (Executor) this.executor).thenAcceptAsync((Consumer<? super U>) this::processOperations, (Executor) this.executor);
        }, this.executor), Futures.loop(() -> {
            return Boolean.valueOf(isRunning() || this.commitQueue.size() > 0);
        }, () -> {
            return this.commitQueue.take(MAX_COMMIT_QUEUE_SIZE).thenAcceptAsync((v1) -> {
                processCommits(v1);
            }, (Executor) this.executor);
        }, this.executor).whenComplete((r5, th) -> {
            Stream flatMap = this.commitQueue.close().stream().flatMap((v0) -> {
                return v0.stream();
            });
            QueueProcessingState queueProcessingState = this.state;
            queueProcessingState.getClass();
            flatMap.forEach(queueProcessingState::notifyOperationCommitted);
            if (th != null) {
                throw new CompletionException(th);
            }
        })).exceptionally(this::iterationErrorHandler);
    }

    protected void doStop() {
        CancellationException cancellationException = new CancellationException("OperationProcessor is shutting down.");
        closeQueue(cancellationException);
        synchronized (this.stateLock) {
            this.dataFrameBuilder.close();
        }
        this.state.fail(cancellationException, null);
        this.throttler.close();
        this.metrics.close();
        super.doStop();
    }

    protected void errorHandler(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        closeQueue(unwrap);
        if (isShutdownException(unwrap)) {
            return;
        }
        super.errorHandler(unwrap);
        stopAsync();
    }

    private Void iterationErrorHandler(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        Service.State state = state();
        if (isShutdownException(unwrap) && (state == Service.State.STOPPING || state == Service.State.TERMINATED || state == Service.State.FAILED)) {
            return null;
        }
        throw unwrap;
    }

    private boolean isShutdownException(Throwable th) {
        return (th instanceof ObjectClosedException) || (th instanceof CancellationException);
    }

    public CompletableFuture<Void> process(Operation operation) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isRunning()) {
            log.debug("{}: process {}.", this.traceObjectId, operation);
            try {
                this.operationQueue.add(new CompletableOperation(operation, completableFuture));
            } catch (Throwable th) {
                if (Exceptions.mustRethrow(th)) {
                    throw th;
                }
                completableFuture.completeExceptionally(th);
            }
        } else {
            completableFuture.completeExceptionally(new IllegalContainerStateException("OperationProcessor is not running."));
        }
        return completableFuture;
    }

    private int getFetchCount() {
        return Math.max(1, (int) (this.cacheUtilizationProvider.getCacheInsertionCapacity() * 1000.0d));
    }

    private void processOperations(Queue<CompletableOperation> queue) {
        boolean isFatalException;
        RuntimeException sneakyThrow;
        Throwable unwrap;
        boolean isFatalException2;
        log.debug("{}: processOperations (OperationCount = {}).", this.traceObjectId, Integer.valueOf(queue.size()));
        Timer timer = new Timer();
        int i = 0;
        while (!queue.isEmpty()) {
            while (!queue.isEmpty()) {
                try {
                    CompletableOperation poll = queue.poll();
                    this.metrics.operationQueueWaitTime(poll.getTimer().getElapsedMillis());
                    try {
                        processOperation(poll);
                        this.state.addPending(poll);
                        i++;
                    } finally {
                        if (isFatalException2) {
                        }
                    }
                } finally {
                    if (isFatalException) {
                    }
                }
            }
            if (queue.isEmpty()) {
                this.metrics.currentState(this.operationQueue.size() + i, this.state.getPendingCount());
                this.metrics.processOperations(i, timer.getElapsedMillis());
                timer = new Timer();
                i = 0;
                if (!this.throttler.isThrottlingRequired()) {
                    queue = this.operationQueue.poll(getFetchCount());
                }
                if (queue.isEmpty()) {
                    log.debug("{}: processOperations (Flush).", this.traceObjectId);
                    synchronized (this.stateLock) {
                        this.dataFrameBuilder.flush();
                    }
                } else {
                    log.debug("{}: processOperations (Add OperationCount = {}).", this.traceObjectId, Integer.valueOf(queue.size()));
                }
            }
        }
    }

    private void processOperation(CompletableOperation completableOperation) throws Exception {
        Preconditions.checkState(!completableOperation.isDone(), "The Operation has already been processed.");
        Operation operation = completableOperation.getOperation();
        synchronized (this.stateLock) {
            this.metadataUpdater.preProcessOperation(operation);
            operation.setSequenceNumber(this.metadataUpdater.nextOperationSequenceNumber());
            this.dataFrameBuilder.append(operation);
            this.metadataUpdater.acceptOperation(operation);
        }
        log.trace("{}: DataFrameBuilder.Append {}.", this.traceObjectId, operation);
    }

    private void closeQueue(Throwable th) {
        Queue close = this.operationQueue.close();
        if (close != null && close.size() > 0) {
            cancelIncompleteOperations(close, th != null ? th : new CancellationException());
        }
        this.commitQueue.cancelPendingTake();
    }

    private void cancelIncompleteOperations(Iterable<CompletableOperation> iterable, Throwable th) {
        if (!$assertionsDisabled && th == null) {
            throw new AssertionError("no exception to set");
        }
        int i = 0;
        for (CompletableOperation completableOperation : iterable) {
            if (!completableOperation.isDone()) {
                this.state.failOperation(completableOperation, th);
                i++;
            }
        }
        log.warn("{}: Cancelling {} operations with exception: {}.", new Object[]{this.traceObjectId, Integer.valueOf(i), th.toString()});
    }

    private static boolean isFatalException(Throwable th) {
        return (th instanceof DataCorruptionException) || (th instanceof DataLogWriterNotPrimaryException) || (th instanceof ObjectClosedException) || (th instanceof CacheFullException);
    }

    private void processCommits(Collection<List<CompletableOperation>> collection) {
        do {
            try {
                Timer timer = new Timer();
                MemoryStateUpdater memoryStateUpdater = this.stateUpdater;
                Iterator<Operation> it = collection.stream().flatMap((v0) -> {
                    return v0.stream();
                }).map((v0) -> {
                    return v0.getOperation();
                }).iterator();
                QueueProcessingState queueProcessingState = this.state;
                queueProcessingState.getClass();
                memoryStateUpdater.process(it, queueProcessingState::notifyOperationCommitted);
                this.metrics.memoryCommit(collection.size(), timer.getElapsed());
                collection = this.commitQueue.poll(MAX_COMMIT_QUEUE_SIZE);
            } catch (Throwable th) {
                log.error("{}: MemoryStateUpdater.process failure.", this.traceObjectId, th);
                if (isFatalException(th)) {
                    Callbacks.invokeSafely(this::errorHandler, th, (Consumer) null);
                    return;
                }
                return;
            }
        } while (!collection.isEmpty());
    }

    @SuppressFBWarnings(justification = "generated code")
    public SegmentStoreMetrics.OperationProcessor getMetrics() {
        return this.metrics;
    }

    static /* synthetic */ Object access$200(OperationProcessor operationProcessor) {
        return operationProcessor.stateLock;
    }

    static /* synthetic */ OperationMetadataUpdater access$300(OperationProcessor operationProcessor) {
        return operationProcessor.metadataUpdater;
    }

    static /* synthetic */ String access$400(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ Logger access$500() {
        return log;
    }

    static /* synthetic */ UpdateableContainerMetadata access$600(OperationProcessor operationProcessor) {
        return operationProcessor.metadata;
    }

    static /* synthetic */ String access$700(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ String access$800(OperationProcessor operationProcessor) {
        return operationProcessor.traceObjectId;
    }

    static /* synthetic */ BlockingDrainingQueue access$900(OperationProcessor operationProcessor) {
        return operationProcessor.commitQueue;
    }

    static /* synthetic */ SegmentStoreMetrics.OperationProcessor access$1000(OperationProcessor operationProcessor) {
        return operationProcessor.metrics;
    }

    static {
        $assertionsDisabled = !OperationProcessor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(OperationProcessor.class);
        SHUTDOWN_TIMEOUT = Duration.ofSeconds(10L);
    }
}
