package org.voltdb.stream.execution;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.voltdb.stream.api.Committer;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.VoltEnvironment;
import org.voltdb.stream.api.extension.CommitResult;
import org.voltdb.stream.api.extension.CompletableCommitResult;
import org.voltdb.stream.execution.util.VisibleForTesting;

/* loaded from: input_file:org/voltdb/stream/execution/BatchCountingCommitter.class */
public class BatchCountingCommitter<T> implements Committer<T> {
    private final ExecutionContext context;
    private volatile CompletableCommitResult commitResult;
    private volatile Committer.CommitHandle commitHandle;
    private Throwable exception;
    private final Set<T> affectedMessages = ConcurrentHashMap.newKeySet();
    private final AtomicInteger consumedCounter = new AtomicInteger(0);
    private final AtomicInteger receivedCounter = new AtomicInteger(0);

    public static <T> BatchCountingCommitter<T> create(ExecutionContext executionContext) {
        return new BatchCountingCommitter<>(executionContext);
    }

    private BatchCountingCommitter(ExecutionContext executionContext) {
        this.context = executionContext;
    }

    @VisibleForTesting
    public Committer.CommitHandle getCommitHandle() {
        return this.commitHandle;
    }

    public Committer.CommitHandle trackResponsesFor(long j) {
        this.commitResult = null;
        this.exception = null;
        this.consumedCounter.set(0);
        this.receivedCounter.set(0);
        this.affectedMessages.clear();
        if (this.commitHandle == null || this.commitHandle.getBatchId() != j) {
            this.commitHandle = new BatchCommitHandle(j);
        } else {
            this.commitHandle = new BatchCommitHandle((Committer.CommitHandle) Objects.requireNonNull(this.commitHandle, "Commit handle cannot be null"));
        }
        return this.commitHandle;
    }

    public void markConsumed(T t) {
        this.consumedCounter.incrementAndGet();
    }

    public boolean canProcess(Committer.CommitHandle commitHandle) {
        Objects.requireNonNull(this.commitHandle, "Commit handle cannot be null, call #trackResponsesFor first");
        return this.commitHandle.getBatchId() == commitHandle.getBatchId() && this.commitHandle.getHandle() == commitHandle.getHandle();
    }

    public boolean hasCommitted() {
        Objects.requireNonNull(this.commitResult, "Commit result cannot be null, call #tryCommit first");
        return this.commitResult.isDone();
    }

    public CommitResult tryCommit() {
        this.commitResult = this.context.execution().nextCommitResult();
        tryComplete(this.commitHandle);
        return this.commitResult;
    }

    public CommitResult tryCommitBefore(Duration duration) {
        this.commitResult = this.context.execution().nextCommitResult(duration);
        tryComplete(this.commitHandle);
        return this.commitResult;
    }

    public void onSuccess(Committer.CommitHandle commitHandle) {
        onSuccess(commitHandle, 1);
    }

    public void onSuccess(Committer.CommitHandle commitHandle, int i) {
        if (canProcess(commitHandle)) {
            this.receivedCounter.accumulateAndGet(i, Integer::sum);
            tryComplete(commitHandle);
        }
    }

    public void onSuccess(Committer.CommitHandle commitHandle, T t) {
        onSuccess(commitHandle, 1);
    }

    public void onSuccess(Committer.CommitHandle commitHandle, T[] tArr) {
        onSuccess(commitHandle, tArr.length);
    }

    public void onSuccess(Committer.CommitHandle commitHandle, List<T> list) {
        onSuccess(commitHandle, list.size());
    }

    public void onFailure(Committer.CommitHandle commitHandle, Throwable th, T t) {
        this.affectedMessages.add(t);
        onFailure(commitHandle, th, 1);
    }

    public void onFailure(Committer.CommitHandle commitHandle, Throwable th, T[] tArr) {
        Collections.addAll(this.affectedMessages, tArr);
        onFailure(commitHandle, th, tArr.length);
    }

    public void onFailure(Committer.CommitHandle commitHandle, Throwable th, List<T> list) {
        this.affectedMessages.addAll(list);
        onFailure(commitHandle, th, list.size());
    }

    @VisibleForTesting
    CommitResult getCommitResult() {
        return this.commitResult;
    }

    private void onFailure(Committer.CommitHandle commitHandle, Throwable th, int i) {
        if (canProcess(commitHandle)) {
            if (this.exception == null) {
                this.exception = new VoltEnvironment.VoltCommitException("Could not commit all messages");
            }
            if (th != null) {
                this.exception.addSuppressed(th);
            }
            this.receivedCounter.accumulateAndGet(i, Integer::sum);
            tryComplete(commitHandle);
        }
    }

    @VisibleForTesting
    void tryComplete(Committer.CommitHandle commitHandle) {
        if (canProcess(commitHandle)) {
            if (((this.commitResult == null || this.commitResult.isDone()) ? false : true) && batchFinished()) {
                if (this.exception == null) {
                    this.commitResult.complete(commitHandle.getBatchId());
                } else {
                    this.commitResult.completeExceptionally(commitHandle.getBatchId(), List.copyOf(this.affectedMessages), this.exception);
                }
            }
        }
    }

    private boolean batchFinished() {
        return this.consumedCounter.get() == this.receivedCounter.get();
    }
}
