package org.elasticsearch.compute.operator.exchange;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService.class */
public final class ExchangeService extends AbstractLifecycleComponent {
    public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
    public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange";
    private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
    private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange";
    public static final String INACTIVE_SINKS_INTERVAL_SETTING = "esql.exchange.sink_inactive_interval";
    public static final TimeValue INACTIVE_SINKS_INTERVAL_DEFAULT;
    private static final Logger LOGGER;
    private final ThreadPool threadPool;
    private final Executor executor;
    private final BlockFactory blockFactory;
    private final Map<String, ExchangeSinkHandler> sinks = ConcurrentCollections.newConcurrentMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$ExchangeTransportAction.class */
    private class ExchangeTransportAction implements TransportRequestHandler<ExchangeRequest> {
        private ExchangeTransportAction() {
        }

        public void messageReceived(ExchangeRequest exchangeRequest, TransportChannel transportChannel, Task task) {
            String exchangeId = exchangeRequest.exchangeId();
            ActionListener<ExchangeResponse> channelActionListener = new ChannelActionListener<>(transportChannel);
            ExchangeSinkHandler exchangeSinkHandler = ExchangeService.this.sinks.get(exchangeId);
            if (exchangeSinkHandler == null) {
                channelActionListener.onResponse(new ExchangeResponse(ExchangeService.this.blockFactory, null, true));
                return;
            }
            CancellableTask cancellableTask = (CancellableTask) task;
            cancellableTask.addListener(() -> {
                exchangeSinkHandler.onFailure(new TaskCancelledException("request cancelled " + cancellableTask.getReasonCancelled()));
            });
            exchangeSinkHandler.fetchPageAsync(exchangeRequest.sourcesFinished(), channelActionListener);
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$InactiveSinksReaper.class */
    private final class InactiveSinksReaper extends AbstractRunnable {
        private final Logger logger;
        private final TimeValue keepAlive;
        private final ThreadPool threadPool;
        static final /* synthetic */ boolean $assertionsDisabled;

        InactiveSinksReaper(Logger logger, ThreadPool threadPool, TimeValue timeValue) {
            this.logger = logger;
            this.keepAlive = timeValue;
            this.threadPool = threadPool;
        }

        public void onFailure(Exception exc) {
            this.logger.error("unexpected error when closing inactive sinks", exc);
            if (!$assertionsDisabled) {
                throw new AssertionError(exc);
            }
        }

        public void onRejection(Exception exc) {
            if ((exc instanceof EsRejectedExecutionException) && ((EsRejectedExecutionException) exc).isExecutorShutdown()) {
                this.logger.debug("rejected execution when closing inactive sinks");
            } else {
                onFailure(exc);
            }
        }

        public boolean isForceExecution() {
            return true;
        }

        protected void doRun() {
            if (!$assertionsDisabled && !Transports.assertNotTransportThread("reaping inactive exchanges can be expensive")) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !ThreadPool.assertNotScheduleThread("reaping inactive exchanges can be expensive")) {
                throw new AssertionError();
            }
            this.logger.debug("start removing inactive sinks");
            long relativeTimeInMillis = this.threadPool.relativeTimeInMillis();
            for (Map.Entry<String, ExchangeSinkHandler> entry : ExchangeService.this.sinks.entrySet()) {
                ExchangeSinkHandler value = entry.getValue();
                if (!value.hasData() || !value.hasListeners()) {
                    long lastUpdatedTimeInMillis = relativeTimeInMillis - value.lastUpdatedTimeInMillis();
                    if (lastUpdatedTimeInMillis > this.keepAlive.millis()) {
                        TimeValue timeValueMillis = TimeValue.timeValueMillis(lastUpdatedTimeInMillis);
                        this.logger.debug("removed sink {} inactive for {}", entry.getKey(), timeValueMillis);
                        ExchangeService.this.finishSinkHandler(entry.getKey(), new ElasticsearchTimeoutException("Exchange sink {} has been inactive for {}", new Object[]{entry.getKey(), timeValueMillis}));
                    }
                }
            }
        }

        static {
            $assertionsDisabled = !ExchangeService.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$OpenExchangeRequest.class */
    public static class OpenExchangeRequest extends TransportRequest {
        private final String sessionId;
        private final int exchangeBuffer;

        OpenExchangeRequest(String str, int i) {
            this.sessionId = str;
            this.exchangeBuffer = i;
        }

        OpenExchangeRequest(StreamInput streamInput) throws IOException {
            super(streamInput);
            this.sessionId = streamInput.readString();
            this.exchangeBuffer = streamInput.readVInt();
        }

        public void writeTo(StreamOutput streamOutput) throws IOException {
            super.writeTo(streamOutput);
            streamOutput.writeString(this.sessionId);
            streamOutput.writeVInt(this.exchangeBuffer);
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$OpenExchangeRequestHandler.class */
    private class OpenExchangeRequestHandler implements TransportRequestHandler<OpenExchangeRequest> {
        private OpenExchangeRequestHandler() {
        }

        public void messageReceived(OpenExchangeRequest openExchangeRequest, TransportChannel transportChannel, Task task) throws Exception {
            ExchangeService.this.createSinkHandler(openExchangeRequest.sessionId, openExchangeRequest.exchangeBuffer);
            transportChannel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink.class */
    static final class TransportRemoteSink implements RemoteSink {
        final TransportService transportService;
        final BlockFactory blockFactory;
        final Transport.Connection connection;
        final Task parentTask;
        final String exchangeId;
        final Executor responseExecutor;
        final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0);
        final AtomicBoolean finished = new AtomicBoolean(false);

        TransportRemoteSink(TransportService transportService, BlockFactory blockFactory, Transport.Connection connection, Task task, String str, Executor executor) {
            this.transportService = transportService;
            this.blockFactory = blockFactory;
            this.connection = connection;
            this.parentTask = task;
            this.exchangeId = str;
            this.responseExecutor = executor;
        }

        @Override // org.elasticsearch.compute.operator.exchange.RemoteSink
        public void fetchPageAsync(boolean z, ActionListener<ExchangeResponse> actionListener) {
            if (z) {
                close(actionListener.map(r7 -> {
                    return new ExchangeResponse(this.blockFactory, null, true);
                }));
            } else if (this.finished.get()) {
                actionListener.onResponse(new ExchangeResponse(this.blockFactory, null, true));
            } else {
                doFetchPageAsync(false, ActionListener.wrap(exchangeResponse -> {
                    if (exchangeResponse.finished()) {
                        this.finished.set(true);
                    }
                    actionListener.onResponse(exchangeResponse);
                }, exc -> {
                    close(ActionListener.running(() -> {
                        actionListener.onFailure(exc);
                    }));
                }));
            }
        }

        private void doFetchPageAsync(boolean z, ActionListener<ExchangeResponse> actionListener) {
            long j = z ? 0L : this.estimatedPageSizeInBytes.get();
            if (j > 0) {
                this.blockFactory.breaker().addEstimateBytesAndMaybeBreak(j, "fetch page");
                actionListener = ActionListener.runAfter(actionListener, () -> {
                    this.blockFactory.breaker().addWithoutBreaking(-j);
                });
            }
            this.transportService.sendChildRequest(this.connection, ExchangeService.EXCHANGE_ACTION_NAME, new ExchangeRequest(this.exchangeId, z), this.parentTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener, streamInput -> {
                BlockStreamInput blockStreamInput = new BlockStreamInput(streamInput, this.blockFactory);
                try {
                    ExchangeResponse exchangeResponse = new ExchangeResponse(blockStreamInput);
                    long ramBytesUsedByPage = exchangeResponse.ramBytesUsedByPage();
                    this.estimatedPageSizeInBytes.getAndUpdate(j2 -> {
                        return Math.max(ramBytesUsedByPage, j2 / 2);
                    });
                    blockStreamInput.close();
                    return exchangeResponse;
                } catch (Throwable th) {
                    try {
                        blockStreamInput.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }, this.responseExecutor));
        }

        @Override // org.elasticsearch.compute.operator.exchange.RemoteSink
        public void close(ActionListener<Void> actionListener) {
            if (this.finished.compareAndSet(false, true)) {
                doFetchPageAsync(true, actionListener.delegateFailure((actionListener2, exchangeResponse) -> {
                    actionListener2.onResponse((Object) null);
                }));
            } else {
                actionListener.onResponse((Object) null);
            }
        }
    }

    public ExchangeService(Settings settings, ThreadPool threadPool, String str, BlockFactory blockFactory) {
        this.threadPool = threadPool;
        this.executor = threadPool.executor(str);
        this.blockFactory = blockFactory;
        TimeValue asTime = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, INACTIVE_SINKS_INTERVAL_DEFAULT);
        this.threadPool.scheduleWithFixedDelay(new InactiveSinksReaper(LOGGER, threadPool, asTime), TimeValue.timeValueMillis(Math.max(1L, asTime.millis() / 2)), this.executor);
    }

    public void registerTransportHandler(TransportService transportService) {
        transportService.registerRequestHandler(EXCHANGE_ACTION_NAME, this.executor, ExchangeRequest::new, new ExchangeTransportAction());
        transportService.registerRequestHandler(OPEN_EXCHANGE_ACTION_NAME, this.executor, OpenExchangeRequest::new, new OpenExchangeRequestHandler());
        transportService.registerRequestHandler(EXCHANGE_ACTION_NAME_FOR_CCS, this.executor, ExchangeRequest::new, new ExchangeTransportAction());
        transportService.registerRequestHandler(OPEN_EXCHANGE_ACTION_NAME_FOR_CCS, this.executor, OpenExchangeRequest::new, new OpenExchangeRequestHandler());
    }

    public ExchangeSinkHandler createSinkHandler(String str, int i) {
        ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(this.blockFactory, i, this.threadPool.relativeTimeInMillisSupplier());
        if (this.sinks.putIfAbsent(str, exchangeSinkHandler) != null) {
            throw new IllegalStateException("sink exchanger for id [" + str + "] already exists");
        }
        return exchangeSinkHandler;
    }

    public ExchangeSinkHandler getSinkHandler(String str) {
        ExchangeSinkHandler exchangeSinkHandler = this.sinks.get(str);
        if (exchangeSinkHandler == null) {
            throw new ResourceNotFoundException("sink exchanger for id [{}] doesn't exist", new Object[]{str});
        }
        return exchangeSinkHandler;
    }

    public void finishSinkHandler(String str, @Nullable Exception exc) {
        ExchangeSinkHandler remove = this.sinks.remove(str);
        if (remove != null) {
            if (exc != null) {
                remove.onFailure(exc);
            }
            if (!$assertionsDisabled && !remove.isFinished()) {
                throw new AssertionError("Exchange sink " + str + " wasn't finished yet");
            }
        }
    }

    public static void openExchange(TransportService transportService, Transport.Connection connection, String str, int i, Executor executor, ActionListener<Void> actionListener) {
        transportService.sendRequest(connection, OPEN_EXCHANGE_ACTION_NAME, new OpenExchangeRequest(str, i), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener.map(empty -> {
            return null;
        }), streamInput -> {
            return TransportResponse.Empty.INSTANCE;
        }, executor));
    }

    public RemoteSink newRemoteSink(Task task, String str, TransportService transportService, Transport.Connection connection) {
        return new TransportRemoteSink(transportService, this.blockFactory, connection, task, str, this.executor);
    }

    public boolean isEmpty() {
        return this.sinks.isEmpty();
    }

    public Set<String> sinkKeys() {
        return this.sinks.keySet();
    }

    protected void doStart() {
    }

    protected void doStop() {
    }

    protected void doClose() {
        doStop();
    }

    public String toString() {
        return "ExchangeService{sinks=" + this.sinks.keySet() + "}";
    }

    static {
        $assertionsDisabled = !ExchangeService.class.desiredAssertionStatus();
        INACTIVE_SINKS_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5L);
        LOGGER = LogManager.getLogger(ExchangeService.class);
    }
}
