package org.elasticsearch.compute.operator;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

/* loaded from: input_file:org/elasticsearch/compute/operator/AsyncOperator.class */
public abstract class AsyncOperator implements Operator {
    private volatile SubscribableListener<Void> blockedFuture;
    private final DriverContext driverContext;
    private final int maxOutstandingRequests;
    private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
    private final AtomicReference<Exception> failure = new AtomicReference<>();
    private final LongAdder totalTimeInNanos = new LongAdder();
    private boolean finished = false;
    private volatile boolean closed = false;
    private final LocalCheckpointTracker checkpoint = new LocalCheckpointTracker(-1, -1);

    /* loaded from: input_file:org/elasticsearch/compute/operator/AsyncOperator$Status.class */
    public static class Status implements Operator.Status {
        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Operator.Status.class, "async_operator", Status::new);
        final long receivedPages;
        final long completedPages;
        final long totalTimeInMillis;

        protected Status(long j, long j2, long j3) {
            this.receivedPages = j;
            this.completedPages = j2;
            this.totalTimeInMillis = j3;
        }

        protected Status(StreamInput streamInput) throws IOException {
            this.receivedPages = streamInput.readVLong();
            this.completedPages = streamInput.readVLong();
            this.totalTimeInMillis = streamInput.readVLong();
        }

        public void writeTo(StreamOutput streamOutput) throws IOException {
            streamOutput.writeVLong(this.receivedPages);
            streamOutput.writeVLong(this.completedPages);
            streamOutput.writeVLong(this.totalTimeInMillis);
        }

        public long receivedPages() {
            return this.receivedPages;
        }

        public long completedPages() {
            return this.completedPages;
        }

        public long totalTimeInMillis() {
            return this.totalTimeInMillis;
        }

        public String getWriteableName() {
            return ENTRY.name;
        }

        public XContentBuilder toXContent(XContentBuilder xContentBuilder, ToXContent.Params params) throws IOException {
            xContentBuilder.startObject();
            innerToXContent(xContentBuilder);
            return xContentBuilder.endObject();
        }

        protected final XContentBuilder innerToXContent(XContentBuilder xContentBuilder) throws IOException {
            xContentBuilder.field("received_pages", this.receivedPages);
            xContentBuilder.field("completed_pages", this.completedPages);
            xContentBuilder.field("total_time_in_millis", this.totalTimeInMillis);
            if (this.totalTimeInMillis >= 0) {
                xContentBuilder.field("total_time", TimeValue.timeValueMillis(this.totalTimeInMillis));
            }
            return xContentBuilder;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Status status = (Status) obj;
            return this.receivedPages == status.receivedPages && this.completedPages == status.completedPages && this.totalTimeInMillis == status.totalTimeInMillis;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.receivedPages), Long.valueOf(this.completedPages), Long.valueOf(this.totalTimeInMillis));
        }

        public String toString() {
            return Strings.toString(this);
        }

        public TransportVersion getMinimalSupportedVersion() {
            return TransportVersions.ESQL_ENRICH_OPERATOR_STATUS;
        }
    }

    public AsyncOperator(DriverContext driverContext, int i) {
        this.driverContext = driverContext;
        this.maxOutstandingRequests = i;
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public boolean needsInput() {
        return this.checkpoint.getMaxSeqNo() - this.checkpoint.getPersistedCheckpoint() < ((long) this.maxOutstandingRequests);
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public void addInput(Page page) {
        if (this.failure.get() != null) {
            page.releaseBlocks();
            return;
        }
        long generateSeqNo = this.checkpoint.generateSeqNo();
        this.driverContext.addAsyncAction();
        boolean z = false;
        try {
            ActionListener wrap = ActionListener.wrap(page2 -> {
                this.buffers.put(Long.valueOf(generateSeqNo), page2);
                onSeqNoCompleted(generateSeqNo);
            }, exc -> {
                releasePageOnAnyThread(page);
                onFailure(exc);
                onSeqNoCompleted(generateSeqNo);
            });
            long nanoTime = System.nanoTime();
            performAsync(page, ActionListener.runAfter(wrap, () -> {
                this.driverContext.removeAsyncAction();
                this.totalTimeInNanos.add(System.nanoTime() - nanoTime);
            }));
            z = true;
            if (1 == 0) {
                this.driverContext.removeAsyncAction();
            }
        } catch (Throwable th) {
            if (!z) {
                this.driverContext.removeAsyncAction();
            }
            throw th;
        }
    }

    private void releasePageOnAnyThread(Page page) {
        page.allowPassingToDifferentDriver();
        page.releaseBlocks();
    }

    protected abstract void performAsync(Page page, ActionListener<Page> actionListener);

    protected abstract void doClose();

    private void onFailure(Exception exc) {
        this.failure.getAndUpdate(exc2 -> {
            if (exc2 == null) {
                return exc;
            }
            if (ExceptionsHelper.unwrap(exc, new Class[]{TaskCancelledException.class}) != null) {
                return exc2;
            }
            if (ExceptionsHelper.unwrap(exc2, new Class[]{TaskCancelledException.class}) != null) {
                return exc;
            }
            if (ExceptionsHelper.unwrapCause(exc2) != ExceptionsHelper.unwrapCause(exc)) {
                exc2.addSuppressed(exc);
            }
            return exc2;
        });
    }

    private void onSeqNoCompleted(long j) {
        this.checkpoint.markSeqNoAsProcessed(j);
        if (this.checkpoint.getPersistedCheckpoint() < this.checkpoint.getProcessedCheckpoint()) {
            notifyIfBlocked();
        }
        if (this.closed || this.failure.get() != null) {
            discardPages();
        }
    }

    private void notifyIfBlocked() {
        SubscribableListener<Void> subscribableListener;
        if (this.blockedFuture != null) {
            synchronized (this) {
                subscribableListener = this.blockedFuture;
                this.blockedFuture = null;
            }
            if (subscribableListener != null) {
                subscribableListener.onResponse((Object) null);
            }
        }
    }

    private void checkFailure() {
        Exception exc = this.failure.get();
        if (exc != null) {
            discardPages();
            throw ExceptionsHelper.convertToElastic(exc);
        }
    }

    private void discardPages() {
        while (true) {
            long persistedCheckpoint = this.checkpoint.getPersistedCheckpoint() + 1;
            if (persistedCheckpoint > this.checkpoint.getProcessedCheckpoint()) {
                return;
            }
            Page remove = this.buffers.remove(Long.valueOf(persistedCheckpoint));
            this.checkpoint.markSeqNoAsPersisted(persistedCheckpoint);
            if (remove != null) {
                releasePageOnAnyThread(remove);
            }
        }
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public final void close() {
        finish();
        this.closed = true;
        discardPages();
        doClose();
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public void finish() {
        this.finished = true;
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public boolean isFinished() {
        if (!this.finished || this.checkpoint.getPersistedCheckpoint() != this.checkpoint.getMaxSeqNo()) {
            return false;
        }
        checkFailure();
        return true;
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public Page getOutput() {
        checkFailure();
        long persistedCheckpoint = this.checkpoint.getPersistedCheckpoint();
        if (persistedCheckpoint >= this.checkpoint.getProcessedCheckpoint()) {
            return null;
        }
        long j = persistedCheckpoint + 1;
        Page remove = this.buffers.remove(Long.valueOf(j));
        this.checkpoint.markSeqNoAsPersisted(j);
        return remove;
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public SubscribableListener<Void> isBlocked() {
        if (this.finished) {
            return Operator.NOT_BLOCKED;
        }
        long persistedCheckpoint = this.checkpoint.getPersistedCheckpoint();
        if (persistedCheckpoint == this.checkpoint.getMaxSeqNo() || persistedCheckpoint < this.checkpoint.getProcessedCheckpoint()) {
            return Operator.NOT_BLOCKED;
        }
        synchronized (this) {
            long persistedCheckpoint2 = this.checkpoint.getPersistedCheckpoint();
            if (persistedCheckpoint2 == this.checkpoint.getMaxSeqNo() || persistedCheckpoint2 < this.checkpoint.getProcessedCheckpoint()) {
                return Operator.NOT_BLOCKED;
            }
            if (this.blockedFuture == null) {
                this.blockedFuture = new SubscribableListener<>();
            }
            return this.blockedFuture;
        }
    }

    @Override // org.elasticsearch.compute.operator.Operator
    public final Operator.Status status() {
        return status(Math.max(0L, this.checkpoint.getMaxSeqNo()), Math.max(0L, this.checkpoint.getProcessedCheckpoint()), TimeValue.timeValueNanos(this.totalTimeInNanos.sum()).millis());
    }

    protected Operator.Status status(long j, long j2, long j3) {
        return new Status(j, j2, j3);
    }
}
