package org.elasticsearch.xpack.esql.plugin;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.class */
public final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
    private final ComputeService computeService;
    private final SearchService searchService;
    private final TransportService transportService;
    private final ExchangeService exchangeService;
    private final Executor esqlExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode.class */
    public static final class DataNode extends Record {
        private final Transport.Connection connection;
        private final List<ShardId> shardIds;
        private final Map<Index, AliasFilter> aliasFilters;

        DataNode(Transport.Connection connection, List<ShardId> list, Map<Index, AliasFilter> map) {
            this.connection = connection;
            this.shardIds = list;
            this.aliasFilters = map;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DataNode.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DataNode.class, Object.class), DataNode.class, "connection;shardIds;aliasFilters", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->shardIds:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNode;->aliasFilters:Ljava/util/Map;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public List<ShardId> shardIds() {
            return this.shardIds;
        }

        public Map<Index, AliasFilter> aliasFilters() {
            return this.aliasFilters;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeRequestExecutor.class */
    public class DataNodeRequestExecutor {
        private final DataNodeRequest request;
        private final CancellableTask parentTask;
        private final ExchangeSinkHandler exchangeSink;
        private final ComputeListener computeListener;
        private final int maxConcurrentShards;
        private final ExchangeSink blockingSink;
        static final /* synthetic */ boolean $assertionsDisabled;

        DataNodeRequestExecutor(DataNodeRequest dataNodeRequest, CancellableTask cancellableTask, ExchangeSinkHandler exchangeSinkHandler, int i, ComputeListener computeListener) {
            this.request = dataNodeRequest;
            this.parentTask = cancellableTask;
            this.exchangeSink = exchangeSinkHandler;
            this.computeListener = computeListener;
            this.maxConcurrentShards = i;
            this.blockingSink = exchangeSinkHandler.createExchangeSink(() -> {
            });
        }

        void start() {
            this.parentTask.addListener(() -> {
                DataNodeComputeHandler.this.exchangeService.finishSinkHandler(this.request.sessionId(), new TaskCancelledException(this.parentTask.getReasonCancelled()));
            });
            runBatch(0);
        }

        private void runBatch(int i) {
            Configuration configuration = this.request.configuration();
            String clusterAlias = this.request.clusterAlias();
            String sessionId = this.request.sessionId();
            final int min = Math.min(i + this.maxConcurrentShards, this.request.shardIds().size());
            List<ShardId> subList = this.request.shardIds().subList(i, min);
            ActionListener<List<DriverProfile>> actionListener = new ActionListener<List<DriverProfile>>() { // from class: org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler.DataNodeRequestExecutor.1
                final ActionListener<List<DriverProfile>> ref;

                {
                    this.ref = DataNodeRequestExecutor.this.computeListener.acquireCompute();
                }

                public void onResponse(List<DriverProfile> list) {
                    try {
                        DataNodeRequestExecutor.this.onBatchCompleted(min);
                    } finally {
                        this.ref.onResponse(list);
                    }
                }

                public void onFailure(Exception exc) {
                    try {
                        DataNodeComputeHandler.this.exchangeService.finishSinkHandler(DataNodeRequestExecutor.this.request.sessionId(), exc);
                    } finally {
                        this.ref.onFailure(exc);
                    }
                }
            };
            DataNodeComputeHandler dataNodeComputeHandler = DataNodeComputeHandler.this;
            Map<Index, AliasFilter> aliasFilters = this.request.aliasFilters();
            CheckedConsumer checkedConsumer = list -> {
                if (!$assertionsDisabled && !ThreadPool.assertCurrentThreadPool(new String[]{"search", EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME})) {
                    throw new AssertionError();
                }
                DataNodeComputeHandler.this.computeService.runCompute(this.parentTask, new ComputeContext(sessionId, "data", clusterAlias, list, configuration, configuration.newFoldContext(), null, () -> {
                    return this.exchangeSink.createExchangeSink(() -> {
                    });
                }), this.request.plan(), actionListener);
            };
            Objects.requireNonNull(actionListener);
            dataNodeComputeHandler.acquireSearchContexts(clusterAlias, subList, configuration, aliasFilters, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
        }

        private void onBatchCompleted(int i) {
            if (i < this.request.shardIds().size() && !this.exchangeSink.isFinished()) {
                runBatch(i);
                return;
            }
            this.exchangeSink.addCompletionListener(ActionListener.runAfter(this.computeListener.acquireAvoid(), () -> {
                DataNodeComputeHandler.this.exchangeService.finishSinkHandler(this.request.sessionId(), (Exception) null);
            }));
            this.blockingSink.finish();
        }

        static {
            $assertionsDisabled = !DataNodeComputeHandler.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult.class */
    public static final class DataNodeResult extends Record {
        private final List<DataNode> dataNodes;
        private final int totalShards;
        private final int skippedShards;

        DataNodeResult(List<DataNode> list, int i, int i2) {
            this.dataNodes = list;
            this.totalShards = i;
            this.skippedShards = i2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DataNodeResult.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DataNodeResult.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DataNodeResult.class, Object.class), DataNodeResult.class, "dataNodes;totalShards;skippedShards", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->dataNodes:Ljava/util/List;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->totalShards:I", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler$DataNodeResult;->skippedShards:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public List<DataNode> dataNodes() {
            return this.dataNodes;
        }

        public int totalShards() {
            return this.totalShards;
        }

        public int skippedShards() {
            return this.skippedShards;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataNodeComputeHandler(ComputeService computeService, SearchService searchService, TransportService transportService, ExchangeService exchangeService, Executor executor) {
        this.computeService = computeService;
        this.searchService = searchService;
        this.transportService = transportService;
        this.exchangeService = exchangeService;
        this.esqlExecutor = executor;
        transportService.registerRequestHandler(ComputeService.DATA_ACTION_NAME, executor, DataNodeRequest::new, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startComputeOnDataNodes(String str, String str2, CancellableTask cancellableTask, Configuration configuration, PhysicalPlan physicalPlan, Set<String> set, OriginalIndices originalIndices, ExchangeSourceHandler exchangeSourceHandler, Runnable runnable, ActionListener<ComputeResponse> actionListener) {
        QueryBuilder canMatchFilter = PlannerUtils.canMatchFilter(physicalPlan);
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        Objects.requireNonNull(addEmptySink);
        ActionListener runAfter = ActionListener.runAfter(actionListener, addEmptySink::close);
        long nanoTime = System.nanoTime();
        CheckedConsumer checkedConsumer = dataNodeResult -> {
            ComputeListener computeListener = new ComputeListener(this.transportService.getThreadPool(), runnable, runAfter.map(list -> {
                return new ComputeResponse(list, TimeValue.timeValueNanos(System.nanoTime() - nanoTime), Integer.valueOf(dataNodeResult.totalShards()), Integer.valueOf(dataNodeResult.totalShards()), Integer.valueOf(dataNodeResult.skippedShards()), 0);
            }));
            try {
                for (DataNode dataNode : dataNodeResult.dataNodes()) {
                    QueryPragmas pragmas = configuration.pragmas();
                    String newChildSession = this.computeService.newChildSession(str);
                    ActionListener map = computeListener.acquireCompute().map((v0) -> {
                        return v0.getProfiles();
                    });
                    ExchangeService.openExchange(this.transportService, dataNode.connection, newChildSession, pragmas.exchangeBufferSize(), this.esqlExecutor, map.delegateFailureAndWrap((actionListener2, r25) -> {
                        exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, newChildSession, this.transportService, dataNode.connection), true, () -> {
                        }, pragmas.concurrentExchangeClients(), computeListener.acquireAvoid());
                        this.transportService.sendChildRequest(dataNode.connection, ComputeService.DATA_ACTION_NAME, new DataNodeRequest(newChildSession, configuration, str2, dataNode.shardIds, dataNode.aliasFilters, physicalPlan, originalIndices.indices(), originalIndices.indicesOptions(), !this.transportService.getLocalNode().getId().equals(dataNode.connection.getNode().getId()) && pragmas.nodeLevelReduction()), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(map, ComputeResponse::new, this.esqlExecutor));
                    }));
                }
                computeListener.close();
            } catch (Throwable th) {
                try {
                    computeListener.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        };
        Objects.requireNonNull(runAfter);
        lookupDataNodes(cancellableTask, str2, canMatchFilter, set, originalIndices, ActionListener.wrap(checkedConsumer, runAfter::onFailure));
    }

    private void acquireSearchContexts(String str, List<ShardId> list, Configuration configuration, Map<Index, AliasFilter> map, ActionListener<List<SearchContext>> actionListener) {
        ArrayList<IndexShard> arrayList = new ArrayList();
        try {
            for (ShardId shardId : list) {
                arrayList.add(this.searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id()));
            }
            ActionRunnable supply = ActionRunnable.supply(actionListener, () -> {
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                try {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        IndexShard indexShard = (IndexShard) it.next();
                        arrayList2.add(this.searchService.createSearchContext(new ShardSearchRequest(indexShard.shardId(), configuration.absoluteStartedTimeInMillis(), (AliasFilter) map.getOrDefault(indexShard.shardId().getIndex(), AliasFilter.EMPTY), str), SearchService.NO_TIMEOUT));
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        ((SearchContext) it2.next()).preProcess();
                    }
                    if (1 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    return arrayList2;
                } catch (Throwable th) {
                    if (0 == 0) {
                        IOUtils.close(arrayList2);
                    }
                    throw th;
                }
            });
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            RefCountingRunnable refCountingRunnable = new RefCountingRunnable(() -> {
                if (atomicBoolean.get()) {
                    this.esqlExecutor.execute(supply);
                } else {
                    supply.run();
                }
            });
            try {
                for (IndexShard indexShard : arrayList) {
                    Releasable acquire = refCountingRunnable.acquire();
                    indexShard.ensureShardSearchActive(bool -> {
                        try {
                            if (bool.booleanValue()) {
                                atomicBoolean.set(true);
                            }
                            if (acquire != null) {
                                acquire.close();
                            }
                        } catch (Throwable th) {
                            if (acquire != null) {
                                try {
                                    acquire.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    });
                }
                refCountingRunnable.close();
            } catch (Throwable th) {
                try {
                    refCountingRunnable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    private void lookupDataNodes(Task task, String str, QueryBuilder queryBuilder, Set<String> set, OriginalIndices originalIndices, ActionListener<DataNodeResult> actionListener) {
        ActionListener map = actionListener.map(searchShardsResponse -> {
            HashMap hashMap = new HashMap();
            for (DiscoveryNode discoveryNode : searchShardsResponse.getNodes()) {
                hashMap.put(discoveryNode.getId(), discoveryNode);
            }
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            int i = 0;
            int i2 = 0;
            for (SearchShardsGroup searchShardsGroup : searchShardsResponse.getGroups()) {
                ShardId shardId = searchShardsGroup.shardId();
                if (searchShardsGroup.allocatedNodes().isEmpty()) {
                    throw new ShardNotFoundException(searchShardsGroup.shardId(), "no shard copies found {}", new Object[]{searchShardsGroup.shardId()});
                }
                if (set.contains(shardId.getIndexName())) {
                    i++;
                    if (searchShardsGroup.skipped()) {
                        i2++;
                    } else {
                        String str2 = (String) searchShardsGroup.allocatedNodes().get(0);
                        ((List) hashMap2.computeIfAbsent(str2, str3 -> {
                            return new ArrayList();
                        })).add(shardId);
                        AliasFilter aliasFilter = (AliasFilter) searchShardsResponse.getAliasFilters().get(shardId.getIndex().getUUID());
                        if (aliasFilter != null) {
                            ((Map) hashMap3.computeIfAbsent(str2, str4 -> {
                                return new HashMap();
                            })).put(shardId.getIndex(), aliasFilter);
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList(hashMap2.size());
            for (Map.Entry entry : hashMap2.entrySet()) {
                arrayList.add(new DataNode(this.transportService.getConnection((DiscoveryNode) hashMap.get(entry.getKey())), (List) entry.getValue(), (Map) hashMap3.getOrDefault(entry.getKey(), Map.of())));
            }
            return new DataNodeResult(arrayList, i, i2);
        });
        this.transportService.sendChildRequest(this.transportService.getLocalNode(), EsqlSearchShardsAction.TYPE.name(), new SearchShardsRequest(originalIndices.indices(), originalIndices.indicesOptions(), queryBuilder, (String) null, (String) null, false, str), task, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(map, SearchShardsResponse::new, this.esqlExecutor));
    }

    private void runComputeOnDataNode(CancellableTask cancellableTask, String str, PhysicalPlan physicalPlan, DataNodeRequest dataNodeRequest, ActionListener<ComputeResponse> actionListener) {
        ComputeListener computeListener = new ComputeListener(this.transportService.getThreadPool(), this.computeService.cancelQueryOnFailure(cancellableTask), actionListener.map(ComputeResponse::new));
        try {
            ActionListener<Void> acquireAvoid = computeListener.acquireAvoid();
            try {
                ExchangeSinkHandler createSinkHandler = this.exchangeService.createSinkHandler(dataNodeRequest.sessionId(), dataNodeRequest.pragmas().exchangeBufferSize());
                new DataNodeRequestExecutor(dataNodeRequest, cancellableTask, createSinkHandler, dataNodeRequest.configuration().pragmas().maxConcurrentShardsPerNode(), computeListener).start();
                ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str);
                cancellableTask.addListener(() -> {
                    this.exchangeService.finishSinkHandler(str, new TaskCancelledException(cancellableTask.getReasonCancelled()));
                });
                ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(1, this.esqlExecutor);
                Objects.requireNonNull(createSinkHandler);
                exchangeSourceHandler.addRemoteSink(createSinkHandler::fetchPageAsync, true, () -> {
                }, 1, ActionListener.noop());
                ActionListener<List<DriverProfile>> acquireCompute = computeListener.acquireCompute();
                ComputeService computeService = this.computeService;
                String sessionId = dataNodeRequest.sessionId();
                String clusterAlias = dataNodeRequest.clusterAlias();
                List of = List.of();
                Configuration configuration = dataNodeRequest.configuration();
                FoldContext foldContext = new FoldContext(dataNodeRequest.pragmas().foldLimit().getBytes());
                Objects.requireNonNull(exchangeSourceHandler);
                computeService.runCompute(cancellableTask, new ComputeContext(sessionId, "node_reduce", clusterAlias, of, configuration, foldContext, exchangeSourceHandler::createExchangeSource, () -> {
                    return sinkHandler.createExchangeSink(() -> {
                    });
                }), physicalPlan, ActionListener.wrap(list -> {
                    sinkHandler.addCompletionListener(ActionListener.running(() -> {
                        this.exchangeService.finishSinkHandler(str, (Exception) null);
                        acquireCompute.onResponse(list);
                    }));
                }, exc -> {
                    this.exchangeService.finishSinkHandler(str, exc);
                    acquireCompute.onFailure(exc);
                }));
                acquireAvoid.onResponse((Object) null);
            } catch (Exception e) {
                this.exchangeService.finishSinkHandler(str, e);
                this.exchangeService.finishSinkHandler(dataNodeRequest.sessionId(), e);
                acquireAvoid.onFailure(e);
            }
            computeListener.close();
        } catch (Throwable th) {
            try {
                computeListener.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void messageReceived(DataNodeRequest dataNodeRequest, TransportChannel transportChannel, Task task) {
        ChannelActionListener channelActionListener = new ChannelActionListener(transportChannel);
        PhysicalPlan plan = dataNodeRequest.plan();
        if (!(plan instanceof ExchangeSinkExec)) {
            channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + String.valueOf(dataNodeRequest.plan())));
            return;
        }
        PhysicalPlan reductionPlan = ComputeService.reductionPlan((ExchangeSinkExec) plan, dataNodeRequest.runNodeLevelReduction());
        String sessionId = dataNodeRequest.sessionId();
        runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, new DataNodeRequest(sessionId + "[n]", dataNodeRequest.configuration(), dataNodeRequest.clusterAlias(), dataNodeRequest.shardIds(), dataNodeRequest.aliasFilters(), dataNodeRequest.plan(), dataNodeRequest.indices(), dataNodeRequest.indicesOptions(), dataNodeRequest.runNodeLevelReduction()), channelActionListener);
    }
}
