package org.elasticsearch.xpack.esql.plugin;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.Result;

/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService.class */
public class ComputeService {
    private static final Logger LOGGER;
    private final SearchService searchService;
    private final BigArrays bigArrays;
    private final BlockFactory blockFactory;
    private final TransportService transportService;
    private final Executor esqlExecutor;
    private final DriverTaskRunner driverRunner;
    private final ExchangeService exchangeService;
    private final EnrichLookupService enrichLookupService;
    private final ClusterService clusterService;
    private final AtomicLong childSessionIdGenerator = new AtomicLong();
    public static final String DATA_ACTION_NAME = "indices:data/read/esql/data";
    public static final String CLUSTER_ACTION_NAME = "indices:data/read/esql/cluster";
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$ClusterRequestHandler.class */
    private class ClusterRequestHandler implements TransportRequestHandler<ClusterComputeRequest> {
        private ClusterRequestHandler() {
        }

        public void messageReceived(ClusterComputeRequest clusterComputeRequest, TransportChannel transportChannel, Task task) {
            ChannelActionListener channelActionListener = new ChannelActionListener(transportChannel);
            RemoteClusterPlan remoteClusterPlan = clusterComputeRequest.remoteClusterPlan();
            PhysicalPlan plan = remoteClusterPlan.plan();
            if (!(plan instanceof ExchangeSinkExec)) {
                channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + String.valueOf(plan)));
                return;
            }
            String clusterAlias = clusterComputeRequest.clusterAlias();
            EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
            esqlExecutionInfo.swapCluster(clusterAlias, (str, cluster) -> {
                return new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(clusterComputeRequest.indices()));
            });
            ComputeListener create = ComputeListener.create(clusterAlias, ComputeService.this.transportService, (CancellableTask) task, esqlExecutionInfo, channelActionListener);
            try {
                ComputeService.this.runComputeOnRemoteCluster(clusterAlias, clusterComputeRequest.sessionId(), (CancellableTask) task, clusterComputeRequest.configuration(), (ExchangeSinkExec) plan, Set.of((Object[]) remoteClusterPlan.targetIndices()), remoteClusterPlan.originalIndices(), esqlExecutionInfo, create);
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext.class */
    public static final class ComputeContext extends Record {
        private final String sessionId;
        private final String clusterAlias;
        private final List<SearchContext> searchContexts;
        private final Configuration configuration;
        private final ExchangeSourceHandler exchangeSource;
        private final ExchangeSinkHandler exchangeSink;

        ComputeContext(String str, String str2, List<SearchContext> list, Configuration configuration, ExchangeSourceHandler exchangeSourceHandler, ExchangeSinkHandler exchangeSinkHandler) {
            this.sessionId = str;
            this.clusterAlias = str2;
            this.searchContexts = list;
            this.configuration = configuration;
            this.exchangeSource = exchangeSourceHandler;
            this.exchangeSink = exchangeSinkHandler;
        }

        public List<SearchExecutionContext> searchExecutionContexts() {
            return this.searchContexts.stream().map(searchContext -> {
                return searchContext.getSearchExecutionContext();
            }).toList();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ComputeContext.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/Configuration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ComputeContext.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/Configuration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ComputeContext.class, Object.class), ComputeContext.class, "sessionId;clusterAlias;searchContexts;configuration;exchangeSource;exchangeSink", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->sessionId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->searchContexts:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->configuration:Lorg/elasticsearch/xpack/esql/session/Configuration;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSource:Lorg/elasticsearch/compute/operator/exchange/ExchangeSourceHandler;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$ComputeContext;->exchangeSink:Lorg/elasticsearch/compute/operator/exchange/ExchangeSinkHandler;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String sessionId() {
            return this.sessionId;
        }

        public String clusterAlias() {
            return this.clusterAlias;
        }

        public List<SearchContext> searchContexts() {
            return this.searchContexts;
        }

        public Configuration configuration() {
            return this.configuration;
        }

        public ExchangeSourceHandler exchangeSource() {
            return this.exchangeSource;
        }

        public ExchangeSinkHandler exchangeSink() {
            return this.exchangeSink;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNode.class */
    public static final class DataNode extends Record {
        private final Transport.Connection connection;
        private final List<ShardId> shardIds;
        private final Map<Index, AliasFilter> aliasFilters;

        DataNode(Transport.Connection connection, List<ShardId> list, Map<Index, AliasFilter> map) {
            this.connection = connection;
            this.shardIds = list;
            this.aliasFilters = map;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DataNode.class, Object.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public List<ShardId> shardIds() {
            return this.shardIds;
        }

        public Map<Index, AliasFilter> aliasFilters() {
            return this.aliasFilters;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeRequestExecutor.class */
    public class DataNodeRequestExecutor {
        private final DataNodeRequest request;
        private final CancellableTask parentTask;
        private final ExchangeSinkHandler exchangeSink;
        private final ComputeListener computeListener;
        private final int maxConcurrentShards;
        private final ExchangeSink blockingSink;
        static final /* synthetic */ boolean $assertionsDisabled;

        DataNodeRequestExecutor(DataNodeRequest dataNodeRequest, CancellableTask cancellableTask, ExchangeSinkHandler exchangeSinkHandler, int i, ComputeListener computeListener) {
            this.request = dataNodeRequest;
            this.parentTask = cancellableTask;
            this.exchangeSink = exchangeSinkHandler;
            this.computeListener = computeListener;
            this.maxConcurrentShards = i;
            this.blockingSink = exchangeSinkHandler.createExchangeSink();
        }

        void start() {
            this.parentTask.addListener(() -> {
                ComputeService.this.exchangeService.finishSinkHandler(this.request.sessionId(), new TaskCancelledException(this.parentTask.getReasonCancelled()));
            });
            runBatch(0);
        }

        private void runBatch(int i) {
            Configuration configuration = this.request.configuration();
            String clusterAlias = this.request.clusterAlias();
            String sessionId = this.request.sessionId();
            final int min = Math.min(i + this.maxConcurrentShards, this.request.shardIds().size());
            List<ShardId> subList = this.request.shardIds().subList(i, min);
            ActionListener<ComputeResponse> actionListener = new ActionListener<ComputeResponse>() { // from class: org.elasticsearch.xpack.esql.plugin.ComputeService.DataNodeRequestExecutor.1
                final ActionListener<ComputeResponse> ref;

                {
                    this.ref = DataNodeRequestExecutor.this.computeListener.acquireCompute();
                }

                public void onResponse(ComputeResponse computeResponse) {
                    try {
                        DataNodeRequestExecutor.this.onBatchCompleted(min);
                    } finally {
                        this.ref.onResponse(computeResponse);
                    }
                }

                public void onFailure(Exception exc) {
                    try {
                        ComputeService.this.exchangeService.finishSinkHandler(DataNodeRequestExecutor.this.request.sessionId(), exc);
                    } finally {
                        this.ref.onFailure(exc);
                    }
                }
            };
            ComputeService computeService = ComputeService.this;
            Map<Index, AliasFilter> aliasFilters = this.request.aliasFilters();
            CheckedConsumer checkedConsumer = list -> {
                if (!$assertionsDisabled && !ThreadPool.assertCurrentThreadPool(new String[]{"search", EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME})) {
                    throw new AssertionError();
                }
                ComputeService.this.runCompute(this.parentTask, new ComputeContext(sessionId, clusterAlias, list, configuration, null, this.exchangeSink), this.request.plan(), actionListener);
            };
            Objects.requireNonNull(actionListener);
            computeService.acquireSearchContexts(clusterAlias, subList, configuration, aliasFilters, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
        }

        private void onBatchCompleted(int i) {
            if (i < this.request.shardIds().size() && !this.exchangeSink.isFinished()) {
                runBatch(i);
                return;
            }
            this.exchangeSink.addCompletionListener(ActionListener.runAfter(this.computeListener.acquireAvoid(), () -> {
                ComputeService.this.exchangeService.finishSinkHandler(this.request.sessionId(), (Exception) null);
            }));
            this.blockingSink.finish();
        }

        static {
            $assertionsDisabled = !ComputeService.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeRequestHandler.class */
    private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
        private DataNodeRequestHandler() {
        }

        public void messageReceived(DataNodeRequest dataNodeRequest, TransportChannel transportChannel, Task task) {
            ChannelActionListener channelActionListener = new ChannelActionListener(transportChannel);
            PhysicalPlan plan = dataNodeRequest.plan();
            if (!(plan instanceof ExchangeSinkExec)) {
                channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + String.valueOf(dataNodeRequest.plan())));
                return;
            }
            ExchangeSinkExec exchangeSinkExec = (ExchangeSinkExec) plan;
            Class<FragmentExec> cls = FragmentExec.class;
            Objects.requireNonNull(FragmentExec.class);
            List collectFirstChildren = exchangeSinkExec.collectFirstChildren((v1) -> {
                return r1.isInstance(v1);
            });
            if (collectFirstChildren.isEmpty()) {
                channelActionListener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + String.valueOf(dataNodeRequest.plan())));
                return;
            }
            ExchangeSourceExec exchangeSourceExec = new ExchangeSourceExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg());
            FragmentExec fragmentExec = (FragmentExec) collectFirstChildren.get(0);
            ExchangeSinkExec exchangeSinkExec2 = new ExchangeSinkExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg(), fragmentExec.reducer() != null ? (PhysicalPlan) fragmentExec.reducer().replaceChildren(List.of(exchangeSourceExec)) : exchangeSourceExec);
            String sessionId = dataNodeRequest.sessionId();
            DataNodeRequest dataNodeRequest2 = new DataNodeRequest(sessionId + "[n]", dataNodeRequest.configuration(), dataNodeRequest.clusterAlias(), dataNodeRequest.shardIds(), dataNodeRequest.aliasFilters(), dataNodeRequest.plan(), dataNodeRequest.indices(), dataNodeRequest.indicesOptions());
            ComputeListener create = ComputeListener.create(ComputeService.this.transportService, (CancellableTask) task, channelActionListener);
            try {
                ComputeService.this.runComputeOnDataNode((CancellableTask) task, sessionId, exchangeSinkExec2, dataNodeRequest2, create);
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult.class */
    public static final class DataNodeResult extends Record {
        private final List<DataNode> dataNodes;
        private final int totalShards;
        private final int skippedShards;

        DataNodeResult(List<DataNode> list, int i, int i2) {
            this.dataNodes = list;
            this.totalShards = i;
            this.skippedShards = i2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DataNodeResult.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DataNodeResult.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DataNodeResult.class, Object.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public List<DataNode> dataNodes() {
            return this.dataNodes;
        }

        public int totalShards() {
            return this.totalShards;
        }

        public int skippedShards() {
            return this.skippedShards;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster.class */
    public static final class RemoteCluster extends Record {
        private final String clusterAlias;
        private final Transport.Connection connection;
        private final String[] concreteIndices;
        private final OriginalIndices originalIndices;

        RemoteCluster(String str, Transport.Connection connection, String[] strArr, OriginalIndices originalIndices) {
            this.clusterAlias = str;
            this.connection = connection;
            this.concreteIndices = strArr;
            this.originalIndices = originalIndices;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RemoteCluster.class, Object.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ComputeService$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String clusterAlias() {
            return this.clusterAlias;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public String[] concreteIndices() {
            return this.concreteIndices;
        }

        public OriginalIndices originalIndices() {
            return this.originalIndices;
        }
    }

    public ComputeService(SearchService searchService, TransportService transportService, ExchangeService exchangeService, EnrichLookupService enrichLookupService, ClusterService clusterService, ThreadPool threadPool, BigArrays bigArrays, BlockFactory blockFactory) {
        this.searchService = searchService;
        this.transportService = transportService;
        this.bigArrays = bigArrays.withCircuitBreaking();
        this.blockFactory = blockFactory;
        this.esqlExecutor = threadPool.executor("search");
        transportService.registerRequestHandler(DATA_ACTION_NAME, this.esqlExecutor, DataNodeRequest::new, new DataNodeRequestHandler());
        transportService.registerRequestHandler(CLUSTER_ACTION_NAME, this.esqlExecutor, ClusterComputeRequest::new, new ClusterRequestHandler());
        this.driverRunner = new DriverTaskRunner(transportService, this.esqlExecutor);
        this.exchangeService = exchangeService;
        this.enrichLookupService = enrichLookupService;
        this.clusterService = clusterService;
    }

    public void execute(String str, CancellableTask cancellableTask, PhysicalPlan physicalPlan, Configuration configuration, EsqlExecutionInfo esqlExecutionInfo, ActionListener<Result> actionListener) {
        Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        ActionListener delegateResponse = actionListener.delegateResponse((actionListener2, exc) -> {
            synchronizedList.forEach(page -> {
                Objects.requireNonNull(page);
                Releasables.closeExpectNoException(page::releaseBlocks);
            });
            actionListener2.onFailure(exc);
        });
        PhysicalPlan physicalPlan2 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v1();
        Objects.requireNonNull(synchronizedList);
        OutputExec outputExec = new OutputExec(physicalPlan2, (v1) -> {
            r3.add(v1);
        });
        PhysicalPlan physicalPlan3 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v2();
        if (physicalPlan3 != null && !(physicalPlan3 instanceof ExchangeSinkExec)) {
            if (!$assertionsDisabled) {
                throw new AssertionError("expected data node plan starts with an ExchangeSink; got " + String.valueOf(physicalPlan3));
            }
            delegateResponse.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + String.valueOf(physicalPlan3)));
            return;
        }
        Map<String, OriginalIndices> groupIndices = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, (String[]) PlannerUtils.planConcreteIndices(physicalPlan).toArray(i -> {
            return new String[i];
        }));
        QueryPragmas pragmas = configuration.pragmas();
        if (physicalPlan3 == null) {
            if (!groupIndices.values().stream().allMatch(originalIndices -> {
                return originalIndices.indices().length == 0;
            })) {
                String str2 = "expected no concrete indices without data node plan; got " + String.valueOf(groupIndices);
                if (!$assertionsDisabled) {
                    throw new AssertionError(str2);
                }
                delegateResponse.onFailure(new IllegalStateException(str2));
                return;
            }
            ComputeContext computeContext = new ComputeContext(newChildSession(str), "", List.of(), configuration, null, null);
            ComputeListener create = ComputeListener.create("", this.transportService, cancellableTask, esqlExecutionInfo, delegateResponse.map(computeResponse -> {
                updateExecutionInfoAfterCoordinatorOnlyQuery(esqlExecutionInfo);
                return new Result(physicalPlan.output(), synchronizedList, computeResponse.getProfiles(), esqlExecutionInfo);
            }));
            try {
                runCompute(cancellableTask, computeContext, outputExec, create.acquireCompute(""));
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (groupIndices.values().stream().allMatch(originalIndices2 -> {
            return originalIndices2.indices().length == 0;
        })) {
            String str3 = "expected concrete indices with data node plan but got empty; data node plan " + String.valueOf(physicalPlan3);
            if (!$assertionsDisabled) {
                throw new AssertionError(str3);
            }
            delegateResponse.onFailure(new IllegalStateException(str3));
            return;
        }
        Map<String, OriginalIndices> groupIndices2 = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan));
        OriginalIndices remove = groupIndices2.remove("");
        OriginalIndices remove2 = groupIndices.remove("");
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(pragmas.exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
        List<Attribute> output = physicalPlan.output();
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        try {
            ComputeListener create2 = ComputeListener.create("", this.transportService, cancellableTask, esqlExecutionInfo, delegateResponse.map(computeResponse2 -> {
                esqlExecutionInfo.markEndQuery();
                return new Result(output, synchronizedList, computeResponse2.getProfiles(), esqlExecutionInfo);
            }));
            try {
                exchangeSourceHandler.addCompletionListener(create2.acquireAvoid());
                runCompute(cancellableTask, new ComputeContext(str, "", List.of(), configuration, exchangeSourceHandler, null), outputExec, create2.acquireCompute(""));
                if (remove2 != null && remove2.indices().length > 0) {
                    startComputeOnDataNodes(str, "", cancellableTask, configuration, physicalPlan3, Set.of((Object[]) remove2.indices()), remove, exchangeSourceHandler, esqlExecutionInfo, create2);
                }
                startComputeOnRemoteClusters(str, cancellableTask, configuration, physicalPlan3, exchangeSourceHandler, getRemoteClusters(groupIndices, groupIndices2), create2);
                if (create2 != null) {
                    create2.close();
                }
                if (addEmptySink != null) {
                    addEmptySink.close();
                }
            } catch (Throwable th3) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (addEmptySink != null) {
                try {
                    addEmptySink.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo esqlExecutionInfo) {
        esqlExecutionInfo.markEndQuery();
        if (esqlExecutionInfo.isCrossClusterSearch()) {
            if (!$assertionsDisabled && esqlExecutionInfo.planningTookTime() == null) {
                throw new AssertionError("Planning took time should be set on EsqlExecutionInfo but is null");
            }
            for (String str : esqlExecutionInfo.clusterAliases()) {
                if (esqlExecutionInfo.getCluster(str).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
                    esqlExecutionInfo.swapCluster(str, (str2, cluster) -> {
                        return new EsqlExecutionInfo.Cluster.Builder(cluster).setTook(esqlExecutionInfo.overallTook()).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0).build();
                    });
                }
            }
        }
    }

    private List<RemoteCluster> getRemoteClusters(Map<String, OriginalIndices> map, Map<String, OriginalIndices> map2) {
        ArrayList arrayList = new ArrayList(map.size());
        RemoteClusterService remoteClusterService = this.transportService.getRemoteClusterService();
        Iterator<Map.Entry<String, OriginalIndices>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            OriginalIndices originalIndices = map.get(key);
            OriginalIndices originalIndices2 = map2.get(key);
            if (originalIndices2 == null) {
                if ($assertionsDisabled) {
                    throw new IllegalStateException("can't find original indices for cluster " + key);
                }
                throw new AssertionError("can't find original indices for cluster " + key);
            }
            if (originalIndices.indices().length > 0) {
                arrayList.add(new RemoteCluster(key, remoteClusterService.getConnection(key), originalIndices.indices(), originalIndices2));
            }
        }
        return arrayList;
    }

    private void startComputeOnDataNodes(String str, String str2, CancellableTask cancellableTask, Configuration configuration, PhysicalPlan physicalPlan, Set<String> set, OriginalIndices originalIndices, ExchangeSourceHandler exchangeSourceHandler, EsqlExecutionInfo esqlExecutionInfo, ComputeListener computeListener) {
        PhysicalPlan physicalPlan2 = !configuration.pragmas().nodeLevelReduction() ? physicalPlan : (PhysicalPlan) physicalPlan.transformUp(FragmentExec.class, fragmentExec -> {
            PhysicalPlan dataNodeReductionPlan = PlannerUtils.dataNodeReductionPlan(fragmentExec.fragment(), physicalPlan);
            return dataNodeReductionPlan == null ? fragmentExec : fragmentExec.withReducer(dataNodeReductionPlan);
        });
        QueryBuilder requestFilter = PlannerUtils.requestFilter(physicalPlan2, fieldAttribute -> {
            return true;
        });
        ActionListener releaseAfter = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSourceHandler.addEmptySink());
        CheckedConsumer checkedConsumer = dataNodeResult -> {
            RefCountingListener refCountingListener = new RefCountingListener(releaseAfter);
            try {
                esqlExecutionInfo.swapCluster(str2, (str3, cluster) -> {
                    return new EsqlExecutionInfo.Cluster.Builder(cluster).setTotalShards(dataNodeResult.totalShards()).setSuccessfulShards(dataNodeResult.totalShards()).setSkippedShards(dataNodeResult.skippedShards()).setFailedShards(0).build();
                });
                for (DataNode dataNode : dataNodeResult.dataNodes()) {
                    QueryPragmas pragmas = configuration.pragmas();
                    String newChildSession = newChildSession(str);
                    ExchangeService.openExchange(this.transportService, dataNode.connection, newChildSession, pragmas.exchangeBufferSize(), this.esqlExecutor, refCountingListener.acquire().delegateFailureAndWrap((actionListener, r26) -> {
                        exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, newChildSession, this.transportService, dataNode.connection), pragmas.concurrentExchangeClients());
                        this.transportService.sendChildRequest(dataNode.connection, DATA_ACTION_NAME, new DataNodeRequest(newChildSession, configuration, str2, dataNode.shardIds, dataNode.aliasFilters, physicalPlan2, originalIndices.indices(), originalIndices.indicesOptions()), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(ActionListener.runBefore(computeListener.acquireCompute(str2), () -> {
                            actionListener.onResponse((Object) null);
                        }), ComputeResponse::new, this.esqlExecutor));
                    }));
                }
                refCountingListener.close();
            } catch (Throwable th) {
                try {
                    refCountingListener.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        };
        Objects.requireNonNull(releaseAfter);
        lookupDataNodes(cancellableTask, str2, requestFilter, set, originalIndices, ActionListener.wrap(checkedConsumer, releaseAfter::onFailure));
    }

    private void startComputeOnRemoteClusters(String str, CancellableTask cancellableTask, Configuration configuration, PhysicalPlan physicalPlan, ExchangeSourceHandler exchangeSourceHandler, List<RemoteCluster> list, ComputeListener computeListener) {
        QueryPragmas pragmas = configuration.pragmas();
        RefCountingListener refCountingListener = new RefCountingListener(ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSourceHandler.addEmptySink()));
        try {
            for (RemoteCluster remoteCluster : list) {
                String newChildSession = newChildSession(str);
                ExchangeService.openExchange(this.transportService, remoteCluster.connection, newChildSession, pragmas.exchangeBufferSize(), this.esqlExecutor, refCountingListener.acquire().delegateFailureAndWrap((actionListener, r22) -> {
                    exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, newChildSession, this.transportService, remoteCluster.connection), pragmas.concurrentExchangeClients());
                    this.transportService.sendChildRequest(remoteCluster.connection, CLUSTER_ACTION_NAME, new ClusterComputeRequest(remoteCluster.clusterAlias, newChildSession, configuration, new RemoteClusterPlan(physicalPlan, remoteCluster.concreteIndices, remoteCluster.originalIndices)), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(ActionListener.runBefore(computeListener.acquireCompute(remoteCluster.clusterAlias()), () -> {
                        actionListener.onResponse((Object) null);
                    }), ComputeResponse::new, this.esqlExecutor));
                }));
            }
            refCountingListener.close();
        } catch (Throwable th) {
            try {
                refCountingListener.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    void runCompute(CancellableTask cancellableTask, ComputeContext computeContext, PhysicalPlan physicalPlan, ActionListener<ComputeResponse> actionListener) {
        ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
            Releasables.close(computeContext.searchContexts);
        });
        ArrayList arrayList = new ArrayList(computeContext.searchContexts.size());
        for (int i = 0; i < computeContext.searchContexts.size(); i++) {
            SearchContext searchContext = computeContext.searchContexts.get(i);
            arrayList.add(new EsPhysicalOperationProviders.DefaultShardContext(i, searchContext.getSearchExecutionContext(), searchContext.request().getAliasFilter()));
        }
        try {
            LocalExecutionPlanner localExecutionPlanner = new LocalExecutionPlanner(computeContext.sessionId, computeContext.clusterAlias, cancellableTask, this.bigArrays, this.blockFactory, this.clusterService.getSettings(), computeContext.configuration, computeContext.exchangeSource(), computeContext.exchangeSink(), this.enrichLookupService, new EsPhysicalOperationProviders(arrayList));
            LOGGER.debug("Received physical plan:\n{}", new Object[]{physicalPlan});
            LocalExecutionPlanner.LocalExecutionPlan plan = localExecutionPlanner.plan(PlannerUtils.localPlan(computeContext.searchExecutionContexts(), computeContext.configuration, physicalPlan));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Local execution plan:\n{}", new Object[]{plan.describe()});
            }
            List<Driver> createDrivers = plan.createDrivers(computeContext.sessionId);
            if (createDrivers.isEmpty()) {
                throw new IllegalStateException("no drivers created");
            }
            LOGGER.debug("using {} drivers", new Object[]{Integer.valueOf(createDrivers.size())});
            this.driverRunner.executeDrivers(cancellableTask, createDrivers, this.transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME), ActionListener.releaseAfter(runBefore.map(r7 -> {
                return computeContext.configuration.profile() ? new ComputeResponse((List<DriverProfile>) createDrivers.stream().map((v0) -> {
                    return v0.profile();
                }).toList()) : new ComputeResponse((List<DriverProfile>) List.of());
            }), () -> {
                Releasables.close(createDrivers);
            }));
        } catch (Exception e) {
            runBefore.onFailure(e);
        }
    }

    private void acquireSearchContexts(String str, List<ShardId> list, Configuration configuration, Map<Index, AliasFilter> map, ActionListener<List<SearchContext>> actionListener) {
        ArrayList<IndexShard> arrayList = new ArrayList();
        try {
            for (ShardId shardId : list) {
                arrayList.add(this.searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id()));
            }
            ActionRunnable supply = ActionRunnable.supply(actionListener, () -> {
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                try {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        IndexShard indexShard = (IndexShard) it.next();
                        arrayList2.add(this.searchService.createSearchContext(new ShardSearchRequest(indexShard.shardId(), configuration.absoluteStartedTimeInMillis(), (AliasFilter) map.getOrDefault(indexShard.shardId().getIndex(), AliasFilter.EMPTY), str), SearchService.NO_TIMEOUT));
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        ((SearchContext) it2.next()).preProcess();
                    }
                    if (1 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    return arrayList2;
                } catch (Throwable th) {
                    if (0 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    throw th;
                }
            });
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            RefCountingRunnable refCountingRunnable = new RefCountingRunnable(() -> {
                if (atomicBoolean.get()) {
                    this.esqlExecutor.execute(supply);
                } else {
                    supply.run();
                }
            });
            try {
                for (IndexShard indexShard : arrayList) {
                    Releasable acquire = refCountingRunnable.acquire();
                    indexShard.ensureShardSearchActive(bool -> {
                        try {
                            if (bool.booleanValue()) {
                                atomicBoolean.set(true);
                            }
                            if (acquire != null) {
                                acquire.close();
                            }
                        } catch (Throwable th) {
                            if (acquire != null) {
                                try {
                                    acquire.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    });
                }
                refCountingRunnable.close();
            } catch (Throwable th) {
                try {
                    refCountingRunnable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    private void lookupDataNodes(Task task, String str, QueryBuilder queryBuilder, Set<String> set, OriginalIndices originalIndices, ActionListener<DataNodeResult> actionListener) {
        ActionListener map = actionListener.map(searchShardsResponse -> {
            HashMap hashMap = new HashMap();
            for (DiscoveryNode discoveryNode : searchShardsResponse.getNodes()) {
                hashMap.put(discoveryNode.getId(), discoveryNode);
            }
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            int i = 0;
            int i2 = 0;
            for (SearchShardsGroup searchShardsGroup : searchShardsResponse.getGroups()) {
                ShardId shardId = searchShardsGroup.shardId();
                if (searchShardsGroup.allocatedNodes().isEmpty()) {
                    throw new ShardNotFoundException(searchShardsGroup.shardId(), "no shard copies found {}", new Object[]{searchShardsGroup.shardId()});
                }
                if (set.contains(shardId.getIndexName())) {
                    i++;
                    if (searchShardsGroup.skipped()) {
                        i2++;
                    } else {
                        String str2 = (String) searchShardsGroup.allocatedNodes().get(0);
                        ((List) hashMap2.computeIfAbsent(str2, str3 -> {
                            return new ArrayList();
                        })).add(shardId);
                        AliasFilter aliasFilter = (AliasFilter) searchShardsResponse.getAliasFilters().get(shardId.getIndex().getUUID());
                        if (aliasFilter != null) {
                            ((Map) hashMap3.computeIfAbsent(str2, str4 -> {
                                return new HashMap();
                            })).put(shardId.getIndex(), aliasFilter);
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList(hashMap2.size());
            for (Map.Entry entry : hashMap2.entrySet()) {
                arrayList.add(new DataNode(this.transportService.getConnection((DiscoveryNode) hashMap.get(entry.getKey())), (List) entry.getValue(), (Map) hashMap3.getOrDefault(entry.getKey(), Map.of())));
            }
            return new DataNodeResult(arrayList, i, i2);
        });
        this.transportService.sendChildRequest(this.transportService.getLocalNode(), EsqlSearchShardsAction.TYPE.name(), new SearchShardsRequest(originalIndices.indices(), originalIndices.indicesOptions(), queryBuilder, (String) null, (String) null, false, str), task, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(map, SearchShardsResponse::new, this.esqlExecutor));
    }

    private void runComputeOnDataNode(CancellableTask cancellableTask, String str, PhysicalPlan physicalPlan, DataNodeRequest dataNodeRequest, ComputeListener computeListener) {
        ActionListener<Void> acquireAvoid = computeListener.acquireAvoid();
        try {
            ExchangeSinkHandler createSinkHandler = this.exchangeService.createSinkHandler(dataNodeRequest.sessionId(), dataNodeRequest.pragmas().exchangeBufferSize());
            new DataNodeRequestExecutor(dataNodeRequest, cancellableTask, createSinkHandler, dataNodeRequest.configuration().pragmas().maxConcurrentShardsPerNode(), computeListener).start();
            ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str);
            cancellableTask.addListener(() -> {
                this.exchangeService.finishSinkHandler(str, new TaskCancelledException(cancellableTask.getReasonCancelled()));
            });
            ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(1, this.esqlExecutor);
            exchangeSourceHandler.addCompletionListener(computeListener.acquireAvoid());
            Objects.requireNonNull(createSinkHandler);
            exchangeSourceHandler.addRemoteSink(createSinkHandler::fetchPageAsync, 1);
            ActionListener<ComputeResponse> acquireCompute = computeListener.acquireCompute();
            runCompute(cancellableTask, new ComputeContext(dataNodeRequest.sessionId(), dataNodeRequest.clusterAlias(), List.of(), dataNodeRequest.configuration(), exchangeSourceHandler, sinkHandler), physicalPlan, ActionListener.wrap(computeResponse -> {
                sinkHandler.addCompletionListener(ActionListener.running(() -> {
                    this.exchangeService.finishSinkHandler(str, (Exception) null);
                    acquireCompute.onResponse(computeResponse);
                }));
            }, exc -> {
                this.exchangeService.finishSinkHandler(str, exc);
                acquireCompute.onFailure(exc);
            }));
            acquireAvoid.onResponse((Object) null);
        } catch (Exception e) {
            this.exchangeService.finishSinkHandler(str, e);
            this.exchangeService.finishSinkHandler(dataNodeRequest.sessionId(), e);
            acquireAvoid.onFailure(e);
        }
    }

    void runComputeOnRemoteCluster(String str, String str2, CancellableTask cancellableTask, Configuration configuration, ExchangeSinkExec exchangeSinkExec, Set<String> set, OriginalIndices originalIndices, EsqlExecutionInfo esqlExecutionInfo, ComputeListener computeListener) {
        ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str2);
        cancellableTask.addListener(() -> {
            this.exchangeService.finishSinkHandler(str2, new TaskCancelledException(cancellableTask.getReasonCancelled()));
        });
        String str3 = str + ":" + str2;
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(configuration.pragmas().exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        try {
            sinkHandler.addCompletionListener(computeListener.acquireAvoid());
            exchangeSourceHandler.addCompletionListener(computeListener.acquireAvoid());
            runCompute(cancellableTask, new ComputeContext(str3, str, List.of(), configuration, exchangeSourceHandler, sinkHandler), new ExchangeSinkExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg(), new ExchangeSourceExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg())), computeListener.acquireCompute(str));
            startComputeOnDataNodes(str3, str, cancellableTask, configuration, exchangeSinkExec, set, originalIndices, exchangeSourceHandler, esqlExecutionInfo, computeListener);
            if (addEmptySink != null) {
                addEmptySink.close();
            }
        } catch (Throwable th) {
            if (addEmptySink != null) {
                try {
                    addEmptySink.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private String newChildSession(String str) {
        return str + "/" + this.childSessionIdGenerator.incrementAndGet();
    }

    static {
        $assertionsDisabled = !ComputeService.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger(ComputeService.class);
    }
}
