package org.voltdb.stream.execution;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.stream.api.Committer;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.VoltEnvironment;
import org.voltdb.stream.api.extension.CommitResult;
import org.voltdb.stream.api.extension.CompletableCommitResult;
import org.voltdb.stream.api.extension.Operator;
import org.voltdb.stream.api.pipeline.ExceptionHandler;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.api.pipeline.VoltStreamSource;
import org.voltdb.stream.execution.util.VisibleForTesting;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/voltdb/stream/execution/BatchAsyncCommitter.class */
public class BatchAsyncCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(BatchAsyncCommitter.class);
    private static final List<CommitResult> ALL_COMMITTED_RESULTS = List.of(CommitResult.COMMITTED);
    private final ExecutionContext executionContext;
    private final VoltEnvironmentInternal environment;
    private final ExceptionHandler exceptionHandler;
    private final VoltStreamSource<Object> source;
    private final VoltStreamSink<Object> sink;
    private final CompoundSink compoundSink;
    private long batchId;
    private List<Object> consumed;
    private Throwable commitFailure;
    private final Map<Operator, Committer<Object>> commiters = new HashMap();
    private final List<Runnable> commitListeners = new ArrayList();
    private boolean inShortCircuitedMode = false;
    private List<CommitResult> uncommittedResults = List.of();
    private int resultCounter = 0;
    private Progress progress = Progress.COMMITTED;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/stream/execution/BatchAsyncCommitter$Progress.class */
    public enum Progress {
        AWAITING_MAIN_SINK_COMMIT,
        AWAITING_ALT_SINK_COMMIT,
        AWAITING_SOURCE_COMMIT,
        COMMITTED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchAsyncCommitter(ExecutionContext executionContext, VoltEnvironmentInternal voltEnvironmentInternal, ExceptionHandler exceptionHandler, VoltStreamSource<Object> voltStreamSource, VoltStreamSink<Object> voltStreamSink, CompoundSink compoundSink) {
        this.executionContext = executionContext;
        this.environment = voltEnvironmentInternal;
        this.exceptionHandler = exceptionHandler;
        this.sink = voltStreamSink;
        this.compoundSink = compoundSink;
        this.source = voltStreamSource;
    }

    public void addListener(Runnable runnable) {
        this.commitListeners.add(runnable);
    }

    public boolean hasCommitted() {
        return this.progress == Progress.COMMITTED;
    }

    public boolean commitInProgress() {
        return !hasCommitted();
    }

    Throwable getCommitFailure() {
        return this.commitFailure;
    }

    public void enterShortCircuitedMode() {
        this.inShortCircuitedMode = true;
    }

    public void resetCommitterState() {
        this.progress = Progress.COMMITTED;
        this.uncommittedResults = List.of();
        this.inShortCircuitedMode = false;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> Committer<T> createCommitterFor(Operator operator) {
        return this.commiters.computeIfAbsent(operator, operator2 -> {
            return BatchCountingCommitter.create(this.executionContext);
        });
    }

    public void commit(long j, List<Object> list) {
        if (!canRunNextCommitCycle()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("For batch {} still awaiting for operators to complete commit asynchronously, progress is {}", Long.valueOf(j), this.progress);
                return;
            }
            return;
        }
        this.batchId = j;
        this.consumed = list;
        this.resultCounter = 0;
        this.commitFailure = null;
        this.uncommittedResults = List.of();
        switch (this.progress) {
            case AWAITING_MAIN_SINK_COMMIT:
            case COMMITTED:
                commitSink();
                break;
            case AWAITING_ALT_SINK_COMMIT:
                commitAltSinks();
                break;
            case AWAITING_SOURCE_COMMIT:
                commitSource();
                break;
            default:
                throw new IllegalStateException("Unknown progress: " + String.valueOf(this.progress));
        }
        if (this.inShortCircuitedMode) {
            failCommit(new VoltEnvironment.OperatorNotReadyException("Circuit breaker is opened."));
        }
    }

    public Progress getProgress() {
        return this.progress;
    }

    public void failCommit(RuntimeException runtimeException) {
        Iterator<CommitResult> it = this.uncommittedResults.iterator();
        while (it.hasNext()) {
            CompletableCommitResult completableCommitResult = (CommitResult) it.next();
            if (completableCommitResult instanceof CompletableCommitResult) {
                completableCommitResult.completeExceptionally(this.batchId, List.of(), runtimeException);
            }
        }
        this.uncommittedResults = List.of();
    }

    @VisibleForTesting
    boolean canRunNextCommitCycle() {
        boolean z = true;
        int i = 0;
        while (true) {
            if (i >= this.uncommittedResults.size()) {
                break;
            }
            CommitResult commitResult = this.uncommittedResults.get(i);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Committer's progress {}; State of the previous commit {}", this.progress, commitResult);
            }
            if (commitResult.isAsync()) {
                z = false;
                break;
            }
            i++;
        }
        return z;
    }

    private void commitSink() {
        if (!this.uncommittedResults.isEmpty()) {
            throw new IllegalStateException("Entered commit cycle without finishing pending commitResults");
        }
        this.progress = Progress.AWAITING_MAIN_SINK_COMMIT;
        CommitResult commitResult = null;
        try {
            commitResult = this.sink.commit(this.batchId, this.executionContext);
        } catch (Exception e) {
            LOG.error("Error committing sink", e);
            this.environment.crash("Unrecoverable error while committing sink, cause is '%s'", e.getMessage());
        }
        if (commitResult == null) {
            this.environment.crash("Sink must always return CommitResult", new Object[0]);
            return;
        }
        this.uncommittedResults = List.of(commitResult);
        if (commitResult.isDone()) {
            sinkCommitted(commitResult);
        } else {
            commitResult.handleAsynchronously(this::sinkCommitted);
        }
    }

    private void sinkCommitted(CommitResult commitResult) {
        Throwable cause = commitResult.getCause();
        if (cause != null) {
            List<Object> affectedMessages = commitResult.getAffectedMessages();
            if ((cause instanceof CommitResult.CommitTimeoutException) && affectedMessages.isEmpty()) {
                affectedMessages = this.consumed;
            }
            if (this.inShortCircuitedMode || (cause instanceof VoltEnvironment.OperatorNotReadyException)) {
                this.commitFailure = cause;
                notifyCommiterFinished();
                return;
            } else {
                LOG.warn("Got failures while committing sink - affected messages {}, cause is {}", affectedMessages, cause);
                this.exceptionHandler.handle(affectedMessages, this.executionContext, cause);
                this.commitFailure = null;
            }
        }
        commitAltSinks();
    }

    private void commitAltSinks() {
        this.progress = Progress.AWAITING_ALT_SINK_COMMIT;
        this.uncommittedResults = this.compoundSink.onCommit(this.batchId, this.executionContext);
        if (this.uncommittedResults == null || this.uncommittedResults.isEmpty()) {
            this.uncommittedResults = ALL_COMMITTED_RESULTS;
        }
        for (CommitResult commitResult : this.uncommittedResults) {
            if (commitResult.isDone()) {
                tryCommitSource(commitResult);
            } else {
                commitResult.handleAsynchronously(this::tryCommitSource);
            }
        }
    }

    private void tryCommitSource(CommitResult commitResult) {
        this.resultCounter++;
        Throwable cause = commitResult.getCause();
        this.commitFailure = cause;
        if (cause != null) {
            LOG.warn("Got failures while committing alt sink, cause is:", cause);
            notifyCommiterFinished();
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got result from {}/{} additional sinks", Integer.valueOf(this.resultCounter), Integer.valueOf(this.uncommittedResults.size()));
        }
        if (this.resultCounter == this.uncommittedResults.size()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("For batch {}, the source can now be committed", Long.valueOf(this.batchId));
            }
            commitSource();
        }
    }

    private void commitSource() {
        this.progress = Progress.AWAITING_SOURCE_COMMIT;
        CommitResult commit = this.source.commit(this.batchId, this.executionContext);
        this.uncommittedResults = List.of(commit);
        if (commit.isDone()) {
            sourceCommitted(commit);
        } else {
            commit.handleAsynchronously(this::sourceCommitted);
        }
    }

    private void sourceCommitted(CommitResult commitResult) {
        Throwable cause = commitResult.getCause();
        if (cause != null) {
            LOG.error("Source has not been committed successfully, cause is", cause);
            this.commitFailure = cause;
            if (cause instanceof CommitResult.CommitTimeoutException) {
                this.progress = Progress.COMMITTED;
            }
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("For batch {}, the source has been committed", Long.valueOf(this.batchId));
            }
            this.uncommittedResults = List.of();
            this.progress = Progress.COMMITTED;
        }
        notifyCommiterFinished();
    }

    private void notifyCommiterFinished() {
        Iterator<Runnable> it = this.commitListeners.iterator();
        while (it.hasNext()) {
            it.next().run();
        }
        this.commitListeners.clear();
    }
}
