package org.elasticsearch.xpack.esql.plugin;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.search.TransportSearchShardsAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;

/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService.class */
public class ComputeService {
    private static final Logger LOGGER;
    private final SearchService searchService;
    private final BigArrays bigArrays;
    private final BlockFactory blockFactory;
    private final TransportService transportService;
    private final Executor esqlExecutor;
    private final DriverTaskRunner driverRunner;
    private final ExchangeService exchangeService;
    private final EnrichLookupService enrichLookupService;
    private final ClusterService clusterService;
    public static final String DATA_ACTION_NAME = "indices:data/read/esql/data";
    public static final String CLUSTER_ACTION_NAME = "indices:data/read/esql/cluster";
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$ClusterRequestHandler.class */
    private class ClusterRequestHandler implements TransportRequestHandler<ClusterComputeRequest> {
        private ClusterRequestHandler() {
        }

        public void messageReceived(ClusterComputeRequest clusterComputeRequest, TransportChannel transportChannel, Task task) {
            ActionListener<ComputeResponse> channelActionListener = new ChannelActionListener<>(transportChannel);
            if (clusterComputeRequest.plan() instanceof ExchangeSinkExec) {
                ComputeService.this.runComputeOnRemoteCluster(clusterComputeRequest.clusterAlias(), clusterComputeRequest.sessionId(), (CancellableTask) task, clusterComputeRequest.configuration(), (ExchangeSinkExec) clusterComputeRequest.plan(), Set.of((Object[]) clusterComputeRequest.indices()), clusterComputeRequest.originalIndices(), channelActionListener);
            } else {
                channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + clusterComputeRequest.plan()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext.class */
    public static final class ComputeContext extends Record {
        private final String sessionId;
        private final String clusterAlias;
        private final List<SearchContext> searchContexts;
        private final EsqlConfiguration configuration;
        private final ExchangeSourceHandler exchangeSource;
        private final ExchangeSinkHandler exchangeSink;

        ComputeContext(String str, String str2, List<SearchContext> list, EsqlConfiguration esqlConfiguration, ExchangeSourceHandler exchangeSourceHandler, ExchangeSinkHandler exchangeSinkHandler) {
            this.sessionId = str;
            this.clusterAlias = str2;
            this.searchContexts = list;
            this.configuration = esqlConfiguration;
            this.exchangeSource = exchangeSourceHandler;
            this.exchangeSink = exchangeSinkHandler;
        }

        public List<SearchExecutionContext> searchExecutionContexts() {
            return this.searchContexts.stream().map(searchContext -> {
                return searchContext.getSearchExecutionContext();
            }).toList();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ComputeContext.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/EsqlConfiguration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ComputeContext.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/EsqlConfiguration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ComputeContext.class, Object.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/EsqlConfiguration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String sessionId() {
            return this.sessionId;
        }

        public String clusterAlias() {
            return this.clusterAlias;
        }

        public List<SearchContext> searchContexts() {
            return this.searchContexts;
        }

        public EsqlConfiguration configuration() {
            return this.configuration;
        }

        public ExchangeSourceHandler exchangeSource() {
            return this.exchangeSource;
        }

        public ExchangeSinkHandler exchangeSink() {
            return this.exchangeSink;
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNode.class */
    static final class DataNode extends Record {
        private final Transport.Connection connection;
        private final List<ShardId> shardIds;
        private final Map<Index, AliasFilter> aliasFilters;

        DataNode(Transport.Connection connection, List<ShardId> list, Map<Index, AliasFilter> map) {
            this.connection = connection;
            this.shardIds = list;
            this.aliasFilters = map;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DataNode.class, Object.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public List<ShardId> shardIds() {
            return this.shardIds;
        }

        public Map<Index, AliasFilter> aliasFilters() {
            return this.aliasFilters;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeRequestExecutor.class */
    public class DataNodeRequestExecutor {
        private final DataNodeRequest request;
        private final CancellableTask parentTask;
        private final ExchangeSinkHandler exchangeSink;
        private final ActionListener<Void> listener;
        private final List<DriverProfile> driverProfiles;
        private final int maxConcurrentShards;
        private final ExchangeSink blockingSink;
        static final /* synthetic */ boolean $assertionsDisabled;

        DataNodeRequestExecutor(DataNodeRequest dataNodeRequest, CancellableTask cancellableTask, ExchangeSinkHandler exchangeSinkHandler, int i, List<DriverProfile> list, ActionListener<Void> actionListener) {
            this.request = dataNodeRequest;
            this.parentTask = cancellableTask;
            this.exchangeSink = exchangeSinkHandler;
            this.listener = actionListener;
            this.driverProfiles = list;
            this.maxConcurrentShards = i;
            this.blockingSink = exchangeSinkHandler.createExchangeSink();
        }

        void start() {
            this.parentTask.addListener(() -> {
                ComputeService.this.exchangeService.finishSinkHandler(this.request.sessionId(), new TaskCancelledException(this.parentTask.getReasonCancelled()));
            });
            runBatch(0);
        }

        private void runBatch(int i) {
            EsqlConfiguration configuration = this.request.configuration();
            String clusterAlias = this.request.clusterAlias();
            String sessionId = this.request.sessionId();
            int min = Math.min(i + this.maxConcurrentShards, this.request.shardIds().size());
            ComputeService.this.acquireSearchContexts(clusterAlias, this.request.shardIds().subList(i, min), configuration, this.request.aliasFilters(), ActionListener.wrap(list -> {
                if (!$assertionsDisabled && !ThreadPool.assertCurrentThreadPool(new String[]{"search", EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME})) {
                    throw new AssertionError();
                }
                ComputeService.this.runCompute(this.parentTask, new ComputeContext(sessionId, clusterAlias, list, configuration, null, this.exchangeSink), this.request.plan(), ActionListener.wrap(list -> {
                    onBatchCompleted(min, list);
                }, this::onFailure));
            }, this::onFailure));
        }

        private void onBatchCompleted(int i, List<DriverProfile> list) {
            if (this.request.configuration().profile()) {
                this.driverProfiles.addAll(list);
            }
            if (i < this.request.shardIds().size() && !this.exchangeSink.isFinished()) {
                runBatch(i);
            } else {
                this.blockingSink.finish();
                this.exchangeSink.addCompletionListener(ContextPreservingActionListener.wrapPreservingContext(ActionListener.runBefore(this.listener, () -> {
                    ComputeService.this.exchangeService.finishSinkHandler(this.request.sessionId(), (Exception) null);
                }), ComputeService.this.transportService.getThreadPool().getThreadContext()));
            }
        }

        private void onFailure(Exception exc) {
            ComputeService.this.exchangeService.finishSinkHandler(this.request.sessionId(), exc);
            this.listener.onFailure(exc);
        }

        static {
            $assertionsDisabled = !ComputeService.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeRequestHandler.class */
    private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
        private DataNodeRequestHandler() {
        }

        public void messageReceived(DataNodeRequest dataNodeRequest, TransportChannel transportChannel, Task task) {
            ActionListener<ComputeResponse> channelActionListener = new ChannelActionListener<>(transportChannel);
            PhysicalPlan plan = dataNodeRequest.plan();
            if (!(plan instanceof ExchangeSinkExec)) {
                channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + dataNodeRequest.plan()));
                return;
            }
            ExchangeSinkExec exchangeSinkExec = (ExchangeSinkExec) plan;
            Class<FragmentExec> cls = FragmentExec.class;
            Objects.requireNonNull(FragmentExec.class);
            List collectFirstChildren = exchangeSinkExec.collectFirstChildren((v1) -> {
                return r1.isInstance(v1);
            });
            if (collectFirstChildren.isEmpty()) {
                channelActionListener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + dataNodeRequest.plan()));
                return;
            }
            ExchangeSourceExec exchangeSourceExec = new ExchangeSourceExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg());
            FragmentExec fragmentExec = (FragmentExec) collectFirstChildren.get(0);
            ExchangeSinkExec exchangeSinkExec2 = new ExchangeSinkExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg(), fragmentExec.reducer() != null ? (PhysicalPlan) fragmentExec.reducer().replaceChildren(List.of(exchangeSourceExec)) : exchangeSourceExec);
            String sessionId = dataNodeRequest.sessionId();
            ComputeService.this.runComputeOnDataNode((CancellableTask) task, sessionId, exchangeSinkExec2, new DataNodeRequest(sessionId + "[n]", dataNodeRequest.configuration(), dataNodeRequest.clusterAlias(), dataNodeRequest.shardIds(), dataNodeRequest.aliasFilters(), dataNodeRequest.plan()), channelActionListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster.class */
    public static final class RemoteCluster extends Record {
        private final String clusterAlias;
        private final Transport.Connection connection;
        private final String[] concreteIndices;
        private final String[] originalIndices;

        RemoteCluster(String str, Transport.Connection connection, String[] strArr, String[] strArr2) {
            this.clusterAlias = str;
            this.connection = connection;
            this.concreteIndices = strArr;
            this.originalIndices = strArr2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:[Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:[Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RemoteCluster.class, Object.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:[Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String clusterAlias() {
            return this.clusterAlias;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public String[] concreteIndices() {
            return this.concreteIndices;
        }

        public String[] originalIndices() {
            return this.originalIndices;
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$Result.class */
    public static final class Result extends Record {
        private final List<Page> pages;
        private final List<DriverProfile> profiles;

        public Result(List<Page> list, List<DriverProfile> list2) {
            this.pages = list;
            this.profiles = list2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Result.class), Result.class, "pages;profiles", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->pages:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->profiles:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Result.class), Result.class, "pages;profiles", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->pages:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->profiles:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Result.class, Object.class), Result.class, "pages;profiles", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->pages:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$Result;->profiles:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public List<Page> pages() {
            return this.pages;
        }

        public List<DriverProfile> profiles() {
            return this.profiles;
        }
    }

    public ComputeService(SearchService searchService, TransportService transportService, ExchangeService exchangeService, EnrichLookupService enrichLookupService, ClusterService clusterService, ThreadPool threadPool, BigArrays bigArrays, BlockFactory blockFactory) {
        this.searchService = searchService;
        this.transportService = transportService;
        this.bigArrays = bigArrays.withCircuitBreaking();
        this.blockFactory = blockFactory;
        this.esqlExecutor = threadPool.executor("search");
        transportService.registerRequestHandler(DATA_ACTION_NAME, this.esqlExecutor, DataNodeRequest::new, new DataNodeRequestHandler());
        transportService.registerRequestHandler(CLUSTER_ACTION_NAME, this.esqlExecutor, ClusterComputeRequest::new, new ClusterRequestHandler());
        this.driverRunner = new DriverTaskRunner(transportService, this.esqlExecutor);
        this.exchangeService = exchangeService;
        this.enrichLookupService = enrichLookupService;
        this.clusterService = clusterService;
    }

    public void execute(String str, CancellableTask cancellableTask, PhysicalPlan physicalPlan, EsqlConfiguration esqlConfiguration, ActionListener<Result> actionListener) {
        Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, esqlConfiguration);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        ActionListener delegateResponse = actionListener.delegateResponse((actionListener2, exc) -> {
            synchronizedList.forEach(page -> {
                Objects.requireNonNull(page);
                Releasables.closeExpectNoException(page::releaseBlocks);
            });
            actionListener2.onFailure(exc);
        });
        PhysicalPlan physicalPlan2 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v1();
        Objects.requireNonNull(synchronizedList);
        OutputExec outputExec = new OutputExec(physicalPlan2, (v1) -> {
            r3.add(v1);
        });
        PhysicalPlan physicalPlan3 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v2();
        if (physicalPlan3 != null && !(physicalPlan3 instanceof ExchangeSinkExec)) {
            if (!$assertionsDisabled) {
                throw new AssertionError("expected data node plan starts with an ExchangeSink; got " + physicalPlan3);
            }
            delegateResponse.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + physicalPlan3));
            return;
        }
        Map<String, OriginalIndices> groupIndices = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, (String[]) PlannerUtils.planConcreteIndices(physicalPlan).toArray(i -> {
            return new String[i];
        }));
        QueryPragmas pragmas = esqlConfiguration.pragmas();
        if (physicalPlan3 == null) {
            if (groupIndices.values().stream().allMatch(originalIndices -> {
                return originalIndices.indices().length == 0;
            })) {
                runCompute(cancellableTask, new ComputeContext(str, "", List.of(), esqlConfiguration, null, null), outputExec, delegateResponse.map(list -> {
                    return new Result(synchronizedList, list);
                }));
                return;
            }
            String str2 = "expected no concrete indices without data node plan; got " + groupIndices;
            if (!$assertionsDisabled) {
                throw new AssertionError(str2);
            }
            delegateResponse.onFailure(new IllegalStateException(str2));
            return;
        }
        if (groupIndices.values().stream().allMatch(originalIndices2 -> {
            return originalIndices2.indices().length == 0;
        })) {
            String str3 = "expected concrete indices with data node plan but got empty; data node plan " + physicalPlan3;
            if (!$assertionsDisabled) {
                throw new AssertionError(str3);
            }
            delegateResponse.onFailure(new IllegalStateException(str3));
            return;
        }
        Map<String, OriginalIndices> groupIndices2 = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan));
        OriginalIndices remove = groupIndices2.remove("");
        OriginalIndices remove2 = groupIndices.remove("");
        ResponseHeadersCollector responseHeadersCollector = new ResponseHeadersCollector(this.transportService.getThreadPool().getThreadContext());
        Objects.requireNonNull(responseHeadersCollector);
        ActionListener runBefore = ActionListener.runBefore(delegateResponse, responseHeadersCollector::finish);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        List synchronizedList2 = esqlConfiguration.profile() ? Collections.synchronizedList(new ArrayList()) : List.of();
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(pragmas.exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        try {
            RefCountingListener refCountingListener = new RefCountingListener(runBefore.map(r7 -> {
                return new Result(synchronizedList, synchronizedList2);
            }));
            try {
                exchangeSourceHandler.addCompletionListener(refCountingListener.acquire());
                runCompute(cancellableTask, new ComputeContext(str, "", List.of(), esqlConfiguration, exchangeSourceHandler, null), outputExec, cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire()).map(list2 -> {
                    responseHeadersCollector.collect();
                    if (!esqlConfiguration.profile()) {
                        return null;
                    }
                    synchronizedList2.addAll(list2);
                    return null;
                }));
                if (remove2 != null && remove2.indices().length > 0) {
                    startComputeOnDataNodes(str, "", cancellableTask, esqlConfiguration, physicalPlan3, Set.of((Object[]) remove2.indices()), remove.indices(), exchangeSourceHandler, ActionListener.releaseAfter(refCountingListener.acquire(), exchangeSourceHandler.addEmptySink()), () -> {
                        return cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire()).map(computeResponse -> {
                            responseHeadersCollector.collect();
                            if (!esqlConfiguration.profile()) {
                                return null;
                            }
                            synchronizedList2.addAll(computeResponse.getProfiles());
                            return null;
                        });
                    });
                }
                startComputeOnRemoteClusters(str, cancellableTask, esqlConfiguration, physicalPlan3, exchangeSourceHandler, getRemoteClusters(groupIndices, groupIndices2), () -> {
                    return cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire()).map(computeResponse -> {
                        responseHeadersCollector.collect();
                        if (!esqlConfiguration.profile()) {
                            return null;
                        }
                        synchronizedList2.addAll(computeResponse.getProfiles());
                        return null;
                    });
                });
                refCountingListener.close();
                if (addEmptySink != null) {
                    addEmptySink.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (addEmptySink != null) {
                try {
                    addEmptySink.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<RemoteCluster> getRemoteClusters(Map<String, OriginalIndices> map, Map<String, OriginalIndices> map2) {
        ArrayList arrayList = new ArrayList(map.size());
        RemoteClusterService remoteClusterService = this.transportService.getRemoteClusterService();
        Iterator<Map.Entry<String, OriginalIndices>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            OriginalIndices originalIndices = map.get(key);
            OriginalIndices originalIndices2 = map2.get(key);
            if (originalIndices2 == null) {
                if ($assertionsDisabled) {
                    throw new IllegalStateException("can't find original indices for cluster " + key);
                }
                throw new AssertionError("can't find original indices for cluster " + key);
            }
            if (originalIndices.indices().length > 0) {
                arrayList.add(new RemoteCluster(key, remoteClusterService.getConnection(key), originalIndices.indices(), originalIndices2.indices()));
            }
        }
        return arrayList;
    }

    private void startComputeOnDataNodes(String str, String str2, CancellableTask cancellableTask, EsqlConfiguration esqlConfiguration, PhysicalPlan physicalPlan, Set<String> set, String[] strArr, ExchangeSourceHandler exchangeSourceHandler, ActionListener<Void> actionListener, Supplier<ActionListener<ComputeResponse>> supplier) {
        PhysicalPlan physicalPlan2 = !esqlConfiguration.pragmas().nodeLevelReduction() ? physicalPlan : (PhysicalPlan) physicalPlan.transformUp(FragmentExec.class, fragmentExec -> {
            PhysicalPlan dataNodeReductionPlan = PlannerUtils.dataNodeReductionPlan(fragmentExec.fragment(), physicalPlan);
            return dataNodeReductionPlan == null ? fragmentExec : fragmentExec.withReducer(dataNodeReductionPlan);
        });
        QueryBuilder requestFilter = PlannerUtils.requestFilter(physicalPlan2, fieldAttribute -> {
            return true;
        });
        CheckedConsumer checkedConsumer = list -> {
            RefCountingRunnable refCountingRunnable = new RefCountingRunnable(() -> {
                actionListener.onResponse((Object) null);
            });
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    DataNode dataNode = (DataNode) it.next();
                    ActionListener releaseAfter = ActionListener.releaseAfter((ActionListener) supplier.get(), refCountingRunnable.acquire());
                    QueryPragmas pragmas = esqlConfiguration.pragmas();
                    ExchangeService.openExchange(this.transportService, dataNode.connection, str, pragmas.exchangeBufferSize(), this.esqlExecutor, releaseAfter.delegateFailureAndWrap((actionListener2, r22) -> {
                        exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, str, this.transportService, dataNode.connection), pragmas.concurrentExchangeClients());
                        this.transportService.sendChildRequest(dataNode.connection, DATA_ACTION_NAME, new DataNodeRequest(str, esqlConfiguration, str2, dataNode.shardIds, dataNode.aliasFilters, physicalPlan2), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener2, ComputeResponse::new, this.esqlExecutor));
                    }));
                }
                refCountingRunnable.close();
            } catch (Throwable th) {
                try {
                    refCountingRunnable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        };
        Objects.requireNonNull(actionListener);
        lookupDataNodes(cancellableTask, str2, requestFilter, set, strArr, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
    }

    private void startComputeOnRemoteClusters(String str, CancellableTask cancellableTask, EsqlConfiguration esqlConfiguration, PhysicalPlan physicalPlan, ExchangeSourceHandler exchangeSourceHandler, List<RemoteCluster> list, Supplier<ActionListener<ComputeResponse>> supplier) {
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        Objects.requireNonNull(addEmptySink);
        RefCountingRunnable refCountingRunnable = new RefCountingRunnable(addEmptySink::close);
        try {
            for (RemoteCluster remoteCluster : list) {
                ActionListener releaseAfter = ActionListener.releaseAfter(supplier.get(), refCountingRunnable.acquire());
                QueryPragmas pragmas = esqlConfiguration.pragmas();
                ExchangeService.openExchange(this.transportService, remoteCluster.connection, str, pragmas.exchangeBufferSize(), this.esqlExecutor, releaseAfter.delegateFailureAndWrap((actionListener, r21) -> {
                    exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, str, this.transportService, remoteCluster.connection), pragmas.concurrentExchangeClients());
                    this.transportService.sendChildRequest(remoteCluster.connection, CLUSTER_ACTION_NAME, new ClusterComputeRequest(remoteCluster.clusterAlias, str, esqlConfiguration, physicalPlan, remoteCluster.concreteIndices, remoteCluster.originalIndices), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener, ComputeResponse::new, this.esqlExecutor));
                }));
            }
            refCountingRunnable.close();
        } catch (Throwable th) {
            try {
                refCountingRunnable.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ActionListener<Void> cancelOnFailure(CancellableTask cancellableTask, AtomicBoolean atomicBoolean, ActionListener<Void> actionListener) {
        return actionListener.delegateResponse((actionListener2, exc) -> {
            actionListener2.onFailure(exc);
            if (atomicBoolean.compareAndSet(false, true)) {
                LOGGER.debug("cancelling ESQL task {} on failure", new Object[]{cancellableTask});
                this.transportService.getTaskManager().cancelTaskAndDescendants(cancellableTask, "cancelled", false, ActionListener.noop());
            }
        });
    }

    void runCompute(CancellableTask cancellableTask, ComputeContext computeContext, PhysicalPlan physicalPlan, ActionListener<List<DriverProfile>> actionListener) {
        ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
            Releasables.close(computeContext.searchContexts);
        });
        ArrayList arrayList = new ArrayList(computeContext.searchContexts.size());
        for (int i = 0; i < computeContext.searchContexts.size(); i++) {
            SearchContext searchContext = computeContext.searchContexts.get(i);
            arrayList.add(new EsPhysicalOperationProviders.DefaultShardContext(i, searchContext.getSearchExecutionContext(), searchContext.request().getAliasFilter()));
        }
        try {
            LocalExecutionPlanner localExecutionPlanner = new LocalExecutionPlanner(computeContext.sessionId, computeContext.clusterAlias, cancellableTask, this.bigArrays, this.blockFactory, this.clusterService.getSettings(), computeContext.configuration, computeContext.exchangeSource(), computeContext.exchangeSink(), this.enrichLookupService, new EsPhysicalOperationProviders(arrayList));
            LOGGER.debug("Received physical plan:\n{}", new Object[]{physicalPlan});
            LocalExecutionPlanner.LocalExecutionPlan plan = localExecutionPlanner.plan(PlannerUtils.localPlan(computeContext.searchExecutionContexts(), computeContext.configuration, physicalPlan));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Local execution plan:\n{}", new Object[]{plan.describe()});
            }
            List<Driver> createDrivers = plan.createDrivers(computeContext.sessionId);
            if (createDrivers.isEmpty()) {
                throw new IllegalStateException("no drivers created");
            }
            LOGGER.debug("using {} drivers", new Object[]{Integer.valueOf(createDrivers.size())});
            this.driverRunner.executeDrivers(cancellableTask, createDrivers, this.transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME), ActionListener.releaseAfter(runBefore.map(r5 -> {
                if (computeContext.configuration.profile()) {
                    return createDrivers.stream().map((v0) -> {
                        return v0.profile();
                    }).toList();
                }
                return null;
            }), () -> {
                Releasables.close(createDrivers);
            }));
        } catch (Exception e) {
            runBefore.onFailure(e);
        }
    }

    private void acquireSearchContexts(String str, List<ShardId> list, EsqlConfiguration esqlConfiguration, Map<Index, AliasFilter> map, ActionListener<List<SearchContext>> actionListener) {
        ArrayList<IndexShard> arrayList = new ArrayList();
        try {
            for (ShardId shardId : list) {
                arrayList.add(this.searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id()));
            }
            ActionRunnable supply = ActionRunnable.supply(actionListener, () -> {
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                try {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        IndexShard indexShard = (IndexShard) it.next();
                        arrayList2.add(this.searchService.createSearchContext(new ShardSearchRequest(indexShard.shardId(), esqlConfiguration.absoluteStartedTimeInMillis(), (AliasFilter) map.getOrDefault(indexShard.shardId().getIndex(), AliasFilter.EMPTY), str), SearchService.NO_TIMEOUT));
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        ((SearchContext) it2.next()).preProcess();
                    }
                    if (1 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    return arrayList2;
                } catch (Throwable th) {
                    if (0 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    throw th;
                }
            });
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            RefCountingRunnable refCountingRunnable = new RefCountingRunnable(() -> {
                if (atomicBoolean.get()) {
                    this.esqlExecutor.execute(supply);
                } else {
                    supply.run();
                }
            });
            try {
                for (IndexShard indexShard : arrayList) {
                    Releasable acquire = refCountingRunnable.acquire();
                    indexShard.ensureShardSearchActive(bool -> {
                        try {
                            if (bool.booleanValue()) {
                                atomicBoolean.set(true);
                            }
                            if (acquire != null) {
                                acquire.close();
                            }
                        } catch (Throwable th) {
                            if (acquire != null) {
                                try {
                                    acquire.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    });
                }
                refCountingRunnable.close();
            } catch (Throwable th) {
                try {
                    refCountingRunnable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    private void lookupDataNodes(Task task, String str, QueryBuilder queryBuilder, Set<String> set, String[] strArr, ActionListener<List<DataNode>> actionListener) {
        ThreadContext threadContext = this.transportService.getThreadPool().getThreadContext();
        ContextPreservingActionListener wrapPreservingContext = ContextPreservingActionListener.wrapPreservingContext(actionListener.map(searchShardsResponse -> {
            HashMap hashMap = new HashMap();
            for (DiscoveryNode discoveryNode : searchShardsResponse.getNodes()) {
                hashMap.put(discoveryNode.getId(), discoveryNode);
            }
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            for (SearchShardsGroup searchShardsGroup : searchShardsResponse.getGroups()) {
                ShardId shardId = searchShardsGroup.shardId();
                if (!searchShardsGroup.skipped()) {
                    if (searchShardsGroup.allocatedNodes().isEmpty()) {
                        throw new ShardNotFoundException(searchShardsGroup.shardId(), "no shard copies found {}", new Object[]{searchShardsGroup.shardId()});
                    }
                    if (set.contains(shardId.getIndexName())) {
                        String str2 = (String) searchShardsGroup.allocatedNodes().get(0);
                        ((List) hashMap2.computeIfAbsent(str2, str3 -> {
                            return new ArrayList();
                        })).add(shardId);
                        AliasFilter aliasFilter = (AliasFilter) searchShardsResponse.getAliasFilters().get(shardId.getIndex().getUUID());
                        if (aliasFilter != null) {
                            ((Map) hashMap3.computeIfAbsent(str2, str4 -> {
                                return new HashMap();
                            })).put(shardId.getIndex(), aliasFilter);
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList(hashMap2.size());
            for (Map.Entry entry : hashMap2.entrySet()) {
                arrayList.add(new DataNode(this.transportService.getConnection((DiscoveryNode) hashMap.get(entry.getKey())), (List) entry.getValue(), (Map) hashMap3.getOrDefault(entry.getKey(), Map.of())));
            }
            return arrayList;
        }), threadContext);
        ThreadContext.StoredContext newStoredContextPreservingResponseHeaders = threadContext.newStoredContextPreservingResponseHeaders();
        try {
            threadContext.markAsSystemContext();
            this.transportService.sendChildRequest(this.transportService.getLocalNode(), TransportSearchShardsAction.TYPE.name(), new SearchShardsRequest(strArr, SearchRequest.DEFAULT_INDICES_OPTIONS, queryBuilder, (String) null, (String) null, false, str), task, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(wrapPreservingContext, SearchShardsResponse::new, this.esqlExecutor));
            if (newStoredContextPreservingResponseHeaders != null) {
                newStoredContextPreservingResponseHeaders.close();
            }
        } catch (Throwable th) {
            if (newStoredContextPreservingResponseHeaders != null) {
                try {
                    newStoredContextPreservingResponseHeaders.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void runComputeOnDataNode(CancellableTask cancellableTask, String str, PhysicalPlan physicalPlan, DataNodeRequest dataNodeRequest, ActionListener<ComputeResponse> actionListener) {
        List synchronizedList = dataNodeRequest.configuration().profile() ? Collections.synchronizedList(new ArrayList()) : List.of();
        ResponseHeadersCollector responseHeadersCollector = new ResponseHeadersCollector(this.transportService.getThreadPool().getThreadContext());
        ActionListener map = actionListener.map(r5 -> {
            return new ComputeResponse((List<DriverProfile>) synchronizedList);
        });
        Objects.requireNonNull(responseHeadersCollector);
        RefCountingListener refCountingListener = new RefCountingListener(ActionListener.runBefore(map, responseHeadersCollector::finish));
        try {
            try {
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                ExchangeSinkHandler createSinkHandler = this.exchangeService.createSinkHandler(dataNodeRequest.sessionId(), dataNodeRequest.pragmas().exchangeBufferSize());
                int maxConcurrentShardsPerNode = dataNodeRequest.configuration().pragmas().maxConcurrentShardsPerNode();
                ActionListener<Void> cancelOnFailure = cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire());
                Objects.requireNonNull(responseHeadersCollector);
                new DataNodeRequestExecutor(dataNodeRequest, cancellableTask, createSinkHandler, maxConcurrentShardsPerNode, synchronizedList, ActionListener.runBefore(cancelOnFailure, responseHeadersCollector::collect)).start();
                ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str);
                cancellableTask.addListener(() -> {
                    this.exchangeService.finishSinkHandler(str, new TaskCancelledException(cancellableTask.getReasonCancelled()));
                });
                ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(1, this.esqlExecutor);
                exchangeSourceHandler.addCompletionListener(refCountingListener.acquire());
                Objects.requireNonNull(createSinkHandler);
                exchangeSourceHandler.addRemoteSink(createSinkHandler::fetchPageAsync, 1);
                ActionListener<Void> cancelOnFailure2 = cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire());
                runCompute(cancellableTask, new ComputeContext(dataNodeRequest.sessionId(), dataNodeRequest.clusterAlias(), List.of(), dataNodeRequest.configuration(), exchangeSourceHandler, sinkHandler), physicalPlan, ActionListener.wrap(list -> {
                    responseHeadersCollector.collect();
                    if (dataNodeRequest.configuration().profile()) {
                        synchronizedList.addAll(list);
                    }
                    sinkHandler.addCompletionListener(ActionListener.runBefore(cancelOnFailure2, () -> {
                        this.exchangeService.finishSinkHandler(str, (Exception) null);
                    }));
                }, exc -> {
                    this.exchangeService.finishSinkHandler(str, exc);
                    cancelOnFailure2.onFailure(exc);
                }));
                refCountingListener.close();
            } catch (Exception e) {
                this.exchangeService.finishSinkHandler(str, e);
                this.exchangeService.finishSinkHandler(dataNodeRequest.sessionId(), e);
                refCountingListener.acquire().onFailure(e);
                refCountingListener.close();
            }
        } catch (Throwable th) {
            refCountingListener.close();
            throw th;
        }
    }

    void runComputeOnRemoteCluster(String str, String str2, CancellableTask cancellableTask, EsqlConfiguration esqlConfiguration, ExchangeSinkExec exchangeSinkExec, Set<String> set, String[] strArr, ActionListener<ComputeResponse> actionListener) {
        ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str2);
        cancellableTask.addListener(() -> {
            this.exchangeService.finishSinkHandler(str2, new TaskCancelledException(cancellableTask.getReasonCancelled()));
        });
        ResponseHeadersCollector responseHeadersCollector = new ResponseHeadersCollector(this.transportService.getThreadPool().getThreadContext());
        Objects.requireNonNull(responseHeadersCollector);
        ActionListener runBefore = ActionListener.runBefore(actionListener, responseHeadersCollector::finish);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        List synchronizedList = esqlConfiguration.profile() ? Collections.synchronizedList(new ArrayList()) : List.of();
        String str3 = str + ":" + str2;
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(esqlConfiguration.pragmas().exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        try {
            RefCountingListener refCountingListener = new RefCountingListener(runBefore.map(r5 -> {
                return new ComputeResponse((List<DriverProfile>) synchronizedList);
            }));
            try {
                sinkHandler.addCompletionListener(refCountingListener.acquire());
                exchangeSourceHandler.addCompletionListener(refCountingListener.acquire());
                runCompute(cancellableTask, new ComputeContext(str3, str, List.of(), esqlConfiguration, exchangeSourceHandler, sinkHandler), new ExchangeSinkExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg(), new ExchangeSourceExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg())), cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire()).map(list -> {
                    responseHeadersCollector.collect();
                    if (!esqlConfiguration.profile()) {
                        return null;
                    }
                    synchronizedList.addAll(list);
                    return null;
                }));
                startComputeOnDataNodes(str3, str, cancellableTask, esqlConfiguration, exchangeSinkExec, set, strArr, exchangeSourceHandler, ActionListener.releaseAfter(refCountingListener.acquire(), exchangeSourceHandler.addEmptySink()), () -> {
                    return cancelOnFailure(cancellableTask, atomicBoolean, refCountingListener.acquire()).map(computeResponse -> {
                        responseHeadersCollector.collect();
                        if (!esqlConfiguration.profile()) {
                            return null;
                        }
                        synchronizedList.addAll(computeResponse.getProfiles());
                        return null;
                    });
                });
                refCountingListener.close();
                if (addEmptySink != null) {
                    addEmptySink.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (addEmptySink != null) {
                try {
                    addEmptySink.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    static {
        $assertionsDisabled = !ComputeService.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger(ComputeService.class);
    }
}
