package org.elasticsearch.xpack.esql.plugin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.ClusterComputeHandler;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.Result;

/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ComputeService.class */
public class ComputeService {
    public static final String DATA_ACTION_NAME = "indices:data/read/esql/data";
    public static final String CLUSTER_ACTION_NAME = "indices:data/read/esql/cluster";
    private static final String LOCAL_CLUSTER = "";
    private static final Logger LOGGER;
    private final SearchService searchService;
    private final BigArrays bigArrays;
    private final BlockFactory blockFactory;
    private final TransportService transportService;
    private final DriverTaskRunner driverRunner;
    private final EnrichLookupService enrichLookupService;
    private final LookupFromIndexService lookupFromIndexService;
    private final ClusterService clusterService;
    private final AtomicLong childSessionIdGenerator = new AtomicLong();
    private final DataNodeComputeHandler dataNodeComputeHandler;
    private final ClusterComputeHandler clusterComputeHandler;
    private final ExchangeService exchangeService;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ComputeService(SearchService searchService, TransportService transportService, ExchangeService exchangeService, EnrichLookupService enrichLookupService, LookupFromIndexService lookupFromIndexService, ClusterService clusterService, ThreadPool threadPool, BigArrays bigArrays, BlockFactory blockFactory) {
        this.searchService = searchService;
        this.transportService = transportService;
        this.bigArrays = bigArrays.withCircuitBreaking();
        this.blockFactory = blockFactory;
        ExecutorService executor = threadPool.executor("search");
        this.driverRunner = new DriverTaskRunner(transportService, executor);
        this.enrichLookupService = enrichLookupService;
        this.lookupFromIndexService = lookupFromIndexService;
        this.clusterService = clusterService;
        this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, executor);
        this.clusterComputeHandler = new ClusterComputeHandler(this, exchangeService, transportService, executor, this.dataNodeComputeHandler);
        this.exchangeService = exchangeService;
    }

    public void execute(String str, CancellableTask cancellableTask, PhysicalPlan physicalPlan, Configuration configuration, FoldContext foldContext, EsqlExecutionInfo esqlExecutionInfo, ActionListener<Result> actionListener) {
        ComputeListener computeListener;
        Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        ActionListener delegateResponse = actionListener.delegateResponse((actionListener2, exc) -> {
            synchronizedList.forEach(page -> {
                Objects.requireNonNull(page);
                Releasables.closeExpectNoException(page::releaseBlocks);
            });
            actionListener2.onFailure(exc);
        });
        PhysicalPlan physicalPlan2 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v1();
        Objects.requireNonNull(synchronizedList);
        OutputExec outputExec = new OutputExec(physicalPlan2, (v1) -> {
            r3.add(v1);
        });
        PhysicalPlan physicalPlan3 = (PhysicalPlan) breakPlanBetweenCoordinatorAndDataNode.v2();
        if (physicalPlan3 != null && !(physicalPlan3 instanceof ExchangeSinkExec)) {
            if (!$assertionsDisabled) {
                throw new AssertionError("expected data node plan starts with an ExchangeSink; got " + String.valueOf(physicalPlan3));
            }
            delegateResponse.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + String.valueOf(physicalPlan3)));
            return;
        }
        Map<String, OriginalIndices> groupIndices = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, (String[]) PlannerUtils.planConcreteIndices(physicalPlan).toArray(i -> {
            return new String[i];
        }));
        QueryPragmas pragmas = configuration.pragmas();
        Runnable cancelQueryOnFailure = cancelQueryOnFailure(cancellableTask);
        if (physicalPlan3 == null) {
            if (!groupIndices.values().stream().allMatch(originalIndices -> {
                return originalIndices.indices().length == 0;
            })) {
                String str2 = "expected no concrete indices without data node plan; got " + String.valueOf(groupIndices);
                if (!$assertionsDisabled) {
                    throw new AssertionError(str2);
                }
                delegateResponse.onFailure(new IllegalStateException(str2));
                return;
            }
            ComputeContext computeContext = new ComputeContext(newChildSession(str), "single", LOCAL_CLUSTER, List.of(), configuration, foldContext, null, null);
            updateShardCountForCoordinatorOnlyQuery(esqlExecutionInfo);
            computeListener = new ComputeListener(this.transportService.getThreadPool(), cancelQueryOnFailure, delegateResponse.map(list -> {
                updateExecutionInfoAfterCoordinatorOnlyQuery(esqlExecutionInfo);
                return new Result(physicalPlan.output(), synchronizedList, list, esqlExecutionInfo);
            }));
            try {
                runCompute(cancellableTask, computeContext, outputExec, computeListener.acquireCompute());
                computeListener.close();
                return;
            } finally {
            }
        }
        if (groupIndices.values().stream().allMatch(originalIndices2 -> {
            return originalIndices2.indices().length == 0;
        })) {
            String str3 = "expected concrete indices with data node plan but got empty; data node plan " + String.valueOf(physicalPlan3);
            if (!$assertionsDisabled) {
                throw new AssertionError(str3);
            }
            delegateResponse.onFailure(new IllegalStateException(str3));
            return;
        }
        Map<String, OriginalIndices> groupIndices2 = this.transportService.getRemoteClusterService().groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan));
        OriginalIndices remove = groupIndices2.remove(LOCAL_CLUSTER);
        OriginalIndices remove2 = groupIndices.remove(LOCAL_CLUSTER);
        List<Attribute> output = physicalPlan.output();
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(pragmas.exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
        ActionListener runBefore = ActionListener.runBefore(delegateResponse, () -> {
            this.exchangeService.removeExchangeSourceHandler(str);
        });
        this.exchangeService.addExchangeSourceHandler(str, exchangeSourceHandler);
        computeListener = new ComputeListener(this.transportService.getThreadPool(), cancelQueryOnFailure, runBefore.map(list2 -> {
            esqlExecutionInfo.markEndQuery();
            return new Result(output, synchronizedList, list2, esqlExecutionInfo);
        }));
        try {
            Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
            try {
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                ComputeListener computeListener2 = new ComputeListener(this.transportService.getThreadPool(), cancelQueryOnFailure, computeListener.acquireCompute().delegateFailure((actionListener3, list3) -> {
                    if (esqlExecutionInfo.isCrossClusterSearch() && esqlExecutionInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
                        TimeValue timeValueNanos = TimeValue.timeValueNanos(System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos().longValue());
                        EsqlExecutionInfo.Cluster.Status status = atomicBoolean.get() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
                        esqlExecutionInfo.swapCluster(LOCAL_CLUSTER, (str4, cluster) -> {
                            return new EsqlExecutionInfo.Cluster.Builder(cluster).setStatus(status).setTook(timeValueNanos).build();
                        });
                    }
                    actionListener3.onResponse(list3);
                }));
                try {
                    List of = List.of();
                    Objects.requireNonNull(exchangeSourceHandler);
                    runCompute(cancellableTask, new ComputeContext(str, "final", LOCAL_CLUSTER, of, configuration, foldContext, exchangeSourceHandler::createExchangeSource, null), outputExec, computeListener2.acquireCompute());
                    if (remove2 != null && remove2.indices().length > 0) {
                        this.dataNodeComputeHandler.startComputeOnDataNodes(str, LOCAL_CLUSTER, cancellableTask, configuration, physicalPlan3, Set.of((Object[]) remove2.indices()), remove, exchangeSourceHandler, cancelQueryOnFailure, computeListener2.acquireCompute().map(computeResponse -> {
                            atomicBoolean.set(esqlExecutionInfo.isPartial());
                            if (esqlExecutionInfo.isCrossClusterSearch() && esqlExecutionInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
                                esqlExecutionInfo.swapCluster(LOCAL_CLUSTER, (str4, cluster) -> {
                                    return new EsqlExecutionInfo.Cluster.Builder(cluster).setTotalShards(computeResponse.getTotalShards()).setSuccessfulShards(computeResponse.getSuccessfulShards()).setSkippedShards(computeResponse.getSkippedShards()).setFailedShards(computeResponse.getFailedShards()).build();
                                });
                            }
                            return computeResponse.getProfiles();
                        }));
                    }
                    computeListener2.close();
                    for (ClusterComputeHandler.RemoteCluster remoteCluster : this.clusterComputeHandler.getRemoteClusters(groupIndices, groupIndices2)) {
                        this.clusterComputeHandler.startComputeOnRemoteCluster(str, cancellableTask, configuration, physicalPlan3, exchangeSourceHandler, remoteCluster, cancelQueryOnFailure, computeListener.acquireCompute().map(computeResponse2 -> {
                            updateExecutionInfo(esqlExecutionInfo, remoteCluster.clusterAlias(), computeResponse2);
                            return computeResponse2.getProfiles();
                        }));
                    }
                    if (addEmptySink != null) {
                        addEmptySink.close();
                    }
                    computeListener.close();
                } finally {
                }
            } finally {
            }
        } finally {
            try {
                computeListener.close();
            } catch (Throwable th) {
                th.addSuppressed(th);
            }
        }
    }

    private void updateExecutionInfo(EsqlExecutionInfo esqlExecutionInfo, String str, ComputeResponse computeResponse) {
        Function function = status -> {
            return status == EsqlExecutionInfo.Cluster.Status.RUNNING ? esqlExecutionInfo.isPartial() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL : status;
        };
        if (computeResponse.getTook() != null) {
            TimeValue timeValueNanos = TimeValue.timeValueNanos(esqlExecutionInfo.planningTookTime().nanos() + computeResponse.getTook().nanos());
            esqlExecutionInfo.swapCluster(str, (str2, cluster) -> {
                return new EsqlExecutionInfo.Cluster.Builder(cluster).setStatus((EsqlExecutionInfo.Cluster.Status) function.apply(cluster.getStatus())).setTook(timeValueNanos).setTotalShards(computeResponse.getTotalShards()).setSuccessfulShards(computeResponse.getSuccessfulShards()).setSkippedShards(computeResponse.getSkippedShards()).setFailedShards(computeResponse.getFailedShards()).build();
            });
        } else {
            TimeValue timeValueNanos2 = TimeValue.timeValueNanos(System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos().longValue());
            esqlExecutionInfo.swapCluster(str, (str3, cluster2) -> {
                return new EsqlExecutionInfo.Cluster.Builder(cluster2).setStatus((EsqlExecutionInfo.Cluster.Status) function.apply(cluster2.getStatus())).setTook(timeValueNanos2).build();
            });
        }
    }

    private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo esqlExecutionInfo) {
        if (esqlExecutionInfo.isCrossClusterSearch()) {
            Iterator<String> it = esqlExecutionInfo.clusterAliases().iterator();
            while (it.hasNext()) {
                esqlExecutionInfo.swapCluster(it.next(), (str, cluster) -> {
                    return new EsqlExecutionInfo.Cluster.Builder(cluster).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0).build();
                });
            }
        }
    }

    private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo esqlExecutionInfo) {
        esqlExecutionInfo.markEndQuery();
        if (esqlExecutionInfo.isCrossClusterSearch()) {
            if (!$assertionsDisabled && esqlExecutionInfo.planningTookTime() == null) {
                throw new AssertionError("Planning took time should be set on EsqlExecutionInfo but is null");
            }
            Iterator<String> it = esqlExecutionInfo.clusterAliases().iterator();
            while (it.hasNext()) {
                esqlExecutionInfo.swapCluster(it.next(), (str, cluster) -> {
                    EsqlExecutionInfo.Cluster.Builder took = new EsqlExecutionInfo.Cluster.Builder(cluster).setTook(esqlExecutionInfo.overallTook());
                    if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
                        took.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
                    }
                    return took.build();
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runCompute(CancellableTask cancellableTask, ComputeContext computeContext, PhysicalPlan physicalPlan, ActionListener<List<DriverProfile>> actionListener) {
        ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
            Releasables.close(computeContext.searchContexts());
        });
        ArrayList arrayList = new ArrayList(computeContext.searchContexts().size());
        for (int i = 0; i < computeContext.searchContexts().size(); i++) {
            SearchContext searchContext = computeContext.searchContexts().get(i);
            arrayList.add(new EsPhysicalOperationProviders.DefaultShardContext(i, new SearchExecutionContext(this, searchContext.getSearchExecutionContext()) { // from class: org.elasticsearch.xpack.esql.plugin.ComputeService.1
                public SourceProvider createSourceProvider() {
                    return new ReinitializingSourceProvider(() -> {
                        return super.createSourceProvider();
                    });
                }
            }, searchContext.request().getAliasFilter()));
        }
        try {
            LocalExecutionPlanner localExecutionPlanner = new LocalExecutionPlanner(computeContext.sessionId(), computeContext.clusterAlias(), cancellableTask, this.bigArrays, this.blockFactory, this.clusterService.getSettings(), computeContext.configuration(), computeContext.exchangeSourceSupplier(), computeContext.exchangeSinkSupplier(), this.enrichLookupService, this.lookupFromIndexService, new EsPhysicalOperationProviders(computeContext.foldCtx(), arrayList, this.searchService.getIndicesService().getAnalysis()), arrayList);
            LOGGER.debug("Received physical plan:\n{}", new Object[]{physicalPlan});
            LocalExecutionPlanner.LocalExecutionPlan plan = localExecutionPlanner.plan(computeContext.description(), computeContext.foldCtx(), PlannerUtils.localPlan(computeContext.searchExecutionContexts(), computeContext.configuration(), computeContext.foldCtx(), physicalPlan));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Local execution plan:\n{}", new Object[]{plan.describe()});
            }
            List<Driver> createDrivers = plan.createDrivers(computeContext.sessionId());
            if (createDrivers.isEmpty()) {
                throw new IllegalStateException("no drivers created");
            }
            LOGGER.debug("using {} drivers", new Object[]{Integer.valueOf(createDrivers.size())});
            this.driverRunner.executeDrivers(cancellableTask, createDrivers, this.transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME), ActionListener.releaseAfter(runBefore.map(r5 -> {
                return computeContext.configuration().profile() ? createDrivers.stream().map((v0) -> {
                    return v0.profile();
                }).toList() : List.of();
            }), () -> {
                Releasables.close(createDrivers);
            }));
        } catch (Exception e) {
            runBefore.onFailure(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan] */
    public static PhysicalPlan reductionPlan(ExchangeSinkExec exchangeSinkExec, boolean z) {
        PhysicalPlan reductionPlan;
        ExchangeSourceExec exchangeSourceExec = new ExchangeSourceExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg());
        if (z && (reductionPlan = PlannerUtils.reductionPlan(exchangeSinkExec)) != null) {
            exchangeSourceExec = (PhysicalPlan) reductionPlan.replaceChildren(List.of(exchangeSourceExec));
        }
        return new ExchangeSinkExec(exchangeSinkExec.source(), exchangeSinkExec.output(), exchangeSinkExec.isIntermediateAgg(), exchangeSourceExec);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String newChildSession(String str) {
        return str + "/" + this.childSessionIdGenerator.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Runnable cancelQueryOnFailure(CancellableTask cancellableTask) {
        return new RunOnce(() -> {
            LOGGER.debug("cancelling ESQL task {} on failure", new Object[]{cancellableTask});
            this.transportService.getTaskManager().cancelTaskAndDescendants(cancellableTask, "cancelled on failure", false, ActionListener.noop());
        });
    }

    static {
        $assertionsDisabled = !ComputeService.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger(ComputeService.class);
    }
}
