package org.elasticsearch.compute.operator.exchange;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.IsBlockedResult;
import org.elasticsearch.core.Releasable;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.class */
public final class ExchangeSourceHandler {
    private final ExchangeBuffer buffer;
    private final Executor fetchExecutor;
    private final FailureCollector failure = new FailureCollector();
    private final PendingInstances outstandingSinks = new PendingInstances(() -> {
        this.buffer.finish(false);
    });
    private final PendingInstances outstandingSources = new PendingInstances(() -> {
        this.buffer.finish(true);
    });

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$ExchangeSourceImpl.class */
    private class ExchangeSourceImpl implements ExchangeSource {
        private boolean finished;

        ExchangeSourceImpl() {
            ExchangeSourceHandler.this.outstandingSources.trackNewInstance();
        }

        private void checkFailure() {
            Exception failure = ExchangeSourceHandler.this.failure.getFailure();
            if (failure != null) {
                throw ExceptionsHelper.convertToRuntime(failure);
            }
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public Page pollPage() {
            checkFailure();
            return ExchangeSourceHandler.this.buffer.pollPage();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public boolean isFinished() {
            checkFailure();
            return this.finished || ExchangeSourceHandler.this.buffer.isFinished();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public IsBlockedResult waitForReading() {
            return ExchangeSourceHandler.this.buffer.waitForReading();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public void finish() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            ExchangeSourceHandler.this.outstandingSources.finishInstance();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public int bufferSize() {
            return ExchangeSourceHandler.this.buffer.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl.class */
    public static class LoopControl {
        private Status status = Status.RUNNING;
        private final Thread startedThread = Thread.currentThread();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl$Status.class */
        public enum Status {
            RUNNING,
            EXITING,
            EXITED
        }

        LoopControl() {
        }

        boolean isRunning() {
            return this.status == Status.RUNNING;
        }

        boolean tryResume() {
            if (this.startedThread != Thread.currentThread() || this.status == Status.EXITED) {
                return false;
            }
            this.status = Status.RUNNING;
            return true;
        }

        void exiting() {
            this.status = Status.EXITING;
        }

        void exited() {
            this.status = Status.EXITED;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$PendingInstances.class */
    public static class PendingInstances {
        private final AtomicInteger instances = new AtomicInteger();
        private final SubscribableListener<Void> completion = new SubscribableListener<>();
        static final /* synthetic */ boolean $assertionsDisabled;

        PendingInstances(Runnable runnable) {
            this.completion.addListener(ActionListener.running(runnable));
        }

        void trackNewInstance() {
            int incrementAndGet = this.instances.incrementAndGet();
            if (!$assertionsDisabled && incrementAndGet <= 0) {
                throw new AssertionError();
            }
        }

        void finishInstance() {
            int decrementAndGet = this.instances.decrementAndGet();
            if (!$assertionsDisabled && decrementAndGet < 0) {
                throw new AssertionError();
            }
            if (decrementAndGet == 0) {
                this.completion.onResponse((Object) null);
            }
        }

        static {
            $assertionsDisabled = !ExchangeSourceHandler.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$RemoteSinkFetcher.class */
    private final class RemoteSinkFetcher {
        private volatile boolean finished = false;
        private final RemoteSink remoteSink;

        RemoteSinkFetcher(RemoteSink remoteSink) {
            ExchangeSourceHandler.this.outstandingSinks.trackNewInstance();
            this.remoteSink = remoteSink;
        }

        void fetchPage() {
            LoopControl loopControl = new LoopControl();
            while (loopControl.isRunning()) {
                loopControl.exiting();
                this.remoteSink.fetchPageAsync(ExchangeSourceHandler.this.buffer.noMoreInputs() || ExchangeSourceHandler.this.failure.hasFailure(), ActionListener.wrap(exchangeResponse -> {
                    Page takePage = exchangeResponse.takePage();
                    if (takePage != null) {
                        ExchangeSourceHandler.this.buffer.addPage(takePage);
                    }
                    if (exchangeResponse.finished()) {
                        onSinkComplete();
                        return;
                    }
                    IsBlockedResult waitForWriting = ExchangeSourceHandler.this.buffer.waitForWriting();
                    if (!waitForWriting.listener().isDone()) {
                        waitForWriting.listener().addListener(ActionListener.wrap(r4 -> {
                            if (loopControl.tryResume()) {
                                return;
                            }
                            fetchPage();
                        }, this::onSinkFailed));
                    } else {
                        if (loopControl.tryResume()) {
                            return;
                        }
                        fetchPage();
                    }
                }, this::onSinkFailed));
            }
            loopControl.exited();
        }

        void onSinkFailed(Exception exc) {
            ExchangeSourceHandler.this.failure.unwrapAndCollect(exc);
            ExchangeSourceHandler.this.buffer.waitForReading().listener().onResponse((Object) null);
            onSinkComplete();
        }

        void onSinkComplete() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            ExchangeSourceHandler.this.outstandingSinks.finishInstance();
        }
    }

    public ExchangeSourceHandler(int i, Executor executor) {
        this.buffer = new ExchangeBuffer(i);
        this.fetchExecutor = executor;
    }

    public void addCompletionListener(ActionListener<Void> actionListener) {
        this.buffer.addCompletionListener(ActionListener.running(() -> {
            RefCountingListener refCountingListener = new RefCountingListener(actionListener);
            try {
                for (PendingInstances pendingInstances : List.of(this.outstandingSinks, this.outstandingSources)) {
                    pendingInstances.trackNewInstance();
                    pendingInstances.completion.addListener(refCountingListener.acquire());
                    pendingInstances.finishInstance();
                }
                refCountingListener.close();
            } catch (Throwable th) {
                try {
                    refCountingListener.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }));
    }

    public ExchangeSource createExchangeSource() {
        return new ExchangeSourceImpl();
    }

    public void addRemoteSink(RemoteSink remoteSink, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            final RemoteSinkFetcher remoteSinkFetcher = new RemoteSinkFetcher(remoteSink);
            this.fetchExecutor.execute(new AbstractRunnable() { // from class: org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler.1
                public void onFailure(Exception exc) {
                    remoteSinkFetcher.onSinkFailed(exc);
                }

                protected void doRun() {
                    remoteSinkFetcher.fetchPage();
                }
            });
        }
    }

    public Releasable addEmptySink() {
        this.outstandingSinks.trackNewInstance();
        PendingInstances pendingInstances = this.outstandingSinks;
        Objects.requireNonNull(pendingInstances);
        return pendingInstances::finishInstance;
    }
}
