package org.elasticsearch.xpack.esql.plugin;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.class */
public final class ClusterComputeHandler implements TransportRequestHandler<ClusterComputeRequest> {
    private final ComputeService computeService;
    private final ExchangeService exchangeService;
    private final TransportService transportService;
    private final Executor esqlExecutor;
    private final DataNodeComputeHandler dataNodeComputeHandler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster.class */
    public static final class RemoteCluster extends Record {
        private final String clusterAlias;
        private final Transport.Connection connection;
        private final String[] concreteIndices;
        private final OriginalIndices originalIndices;

        RemoteCluster(String str, Transport.Connection connection, String[] strArr, OriginalIndices originalIndices) {
            this.clusterAlias = str;
            this.connection = connection;
            this.concreteIndices = strArr;
            this.originalIndices = originalIndices;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RemoteCluster.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RemoteCluster.class, Object.class), RemoteCluster.class, "clusterAlias;connection;concreteIndices;originalIndices", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->clusterAlias:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->connection:Lorg/elasticsearch/transport/Transport$Connection;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->concreteIndices:[Ljava/lang/String;", "FIELD:Lorg/elasticsearch/xpack/esql/plugin/ClusterComputeHandler$RemoteCluster;->originalIndices:Lorg/elasticsearch/action/OriginalIndices;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String clusterAlias() {
            return this.clusterAlias;
        }

        public Transport.Connection connection() {
            return this.connection;
        }

        public String[] concreteIndices() {
            return this.concreteIndices;
        }

        public OriginalIndices originalIndices() {
            return this.originalIndices;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterComputeHandler(ComputeService computeService, ExchangeService exchangeService, TransportService transportService, Executor executor, DataNodeComputeHandler dataNodeComputeHandler) {
        this.computeService = computeService;
        this.exchangeService = exchangeService;
        this.esqlExecutor = executor;
        this.transportService = transportService;
        this.dataNodeComputeHandler = dataNodeComputeHandler;
        transportService.registerRequestHandler(ComputeService.CLUSTER_ACTION_NAME, executor, ClusterComputeRequest::new, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startComputeOnRemoteCluster(String str, CancellableTask cancellableTask, Configuration configuration, PhysicalPlan physicalPlan, ExchangeSourceHandler exchangeSourceHandler, RemoteCluster remoteCluster, Runnable runnable, ActionListener<ComputeResponse> actionListener) {
        QueryPragmas pragmas = configuration.pragmas();
        Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
        Objects.requireNonNull(addEmptySink);
        ActionListener runBefore = ActionListener.runBefore(actionListener, addEmptySink::close);
        String newChildSession = this.computeService.newChildSession(str);
        AtomicReference atomicReference = new AtomicReference();
        ComputeListener computeListener = new ComputeListener(this.transportService.getThreadPool(), runnable, runBefore.map(list -> {
            return (ComputeResponse) Objects.requireNonNullElseGet((ComputeResponse) atomicReference.get(), () -> {
                return new ComputeResponse((List<DriverProfile>) list);
            });
        }));
        try {
            ExchangeService.openExchange(this.transportService, remoteCluster.connection, newChildSession, pragmas.exchangeBufferSize(), this.esqlExecutor, computeListener.acquireCompute().delegateFailureAndWrap((actionListener2, r23) -> {
                exchangeSourceHandler.addRemoteSink(this.exchangeService.newRemoteSink(cancellableTask, newChildSession, this.transportService, remoteCluster.connection), true, () -> {
                }, pragmas.concurrentExchangeClients(), computeListener.acquireAvoid());
                this.transportService.sendChildRequest(remoteCluster.connection, ComputeService.CLUSTER_ACTION_NAME, new ClusterComputeRequest(remoteCluster.clusterAlias, newChildSession, configuration, new RemoteClusterPlan(physicalPlan, remoteCluster.concreteIndices, remoteCluster.originalIndices)), cancellableTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener2.map(computeResponse -> {
                    atomicReference.set(computeResponse);
                    return computeResponse.getProfiles();
                }), ComputeResponse::new, this.esqlExecutor));
            }));
            computeListener.close();
        } catch (Throwable th) {
            try {
                computeListener.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<RemoteCluster> getRemoteClusters(Map<String, OriginalIndices> map, Map<String, OriginalIndices> map2) {
        ArrayList arrayList = new ArrayList(map.size());
        RemoteClusterService remoteClusterService = this.transportService.getRemoteClusterService();
        Iterator<Map.Entry<String, OriginalIndices>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            OriginalIndices originalIndices = map.get(key);
            OriginalIndices originalIndices2 = map2.get(key);
            if (originalIndices2 == null) {
                if ($assertionsDisabled) {
                    throw new IllegalStateException("can't find original indices for cluster " + key);
                }
                throw new AssertionError("can't find original indices for cluster " + key);
            }
            if (originalIndices.indices().length > 0) {
                arrayList.add(new RemoteCluster(key, remoteClusterService.getConnection(key), originalIndices.indices(), originalIndices2));
            }
        }
        return arrayList;
    }

    public void messageReceived(ClusterComputeRequest clusterComputeRequest, TransportChannel transportChannel, Task task) {
        ChannelActionListener channelActionListener = new ChannelActionListener(transportChannel);
        RemoteClusterPlan remoteClusterPlan = clusterComputeRequest.remoteClusterPlan();
        PhysicalPlan plan = remoteClusterPlan.plan();
        if (plan instanceof ExchangeSinkExec) {
            runComputeOnRemoteCluster(clusterComputeRequest.clusterAlias(), clusterComputeRequest.sessionId(), (CancellableTask) task, clusterComputeRequest.configuration(), (ExchangeSinkExec) plan, Set.of((Object[]) remoteClusterPlan.targetIndices()), remoteClusterPlan.originalIndices(), channelActionListener);
        } else {
            channelActionListener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + String.valueOf(plan)));
        }
    }

    void runComputeOnRemoteCluster(String str, String str2, CancellableTask cancellableTask, Configuration configuration, ExchangeSinkExec exchangeSinkExec, Set<String> set, OriginalIndices originalIndices, ActionListener<ComputeResponse> actionListener) {
        ExchangeSinkHandler sinkHandler = this.exchangeService.getSinkHandler(str2);
        cancellableTask.addListener(() -> {
            this.exchangeService.finishSinkHandler(str2, new TaskCancelledException(cancellableTask.getReasonCancelled()));
        });
        String str3 = str + ":" + str2;
        PhysicalPlan reductionPlan = ComputeService.reductionPlan(exchangeSinkExec, true);
        AtomicReference atomicReference = new AtomicReference();
        long nanoTime = System.nanoTime();
        Runnable cancelQueryOnFailure = this.computeService.cancelQueryOnFailure(cancellableTask);
        ComputeListener computeListener = new ComputeListener(this.transportService.getThreadPool(), cancelQueryOnFailure, actionListener.map(list -> {
            TimeValue timeValueNanos = TimeValue.timeValueNanos(System.nanoTime() - nanoTime);
            ComputeResponse computeResponse = (ComputeResponse) atomicReference.get();
            return new ComputeResponse(list, timeValueNanos, Integer.valueOf(computeResponse.totalShards), Integer.valueOf(computeResponse.successfulShards), Integer.valueOf(computeResponse.skippedShards), Integer.valueOf(computeResponse.failedShards));
        }));
        try {
            ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(configuration.pragmas().exchangeBufferSize(), this.transportService.getThreadPool().executor("search"));
            Releasable addEmptySink = exchangeSourceHandler.addEmptySink();
            try {
                sinkHandler.addCompletionListener(computeListener.acquireAvoid());
                ComputeService computeService = this.computeService;
                List of = List.of();
                FoldContext newFoldContext = configuration.newFoldContext();
                Objects.requireNonNull(exchangeSourceHandler);
                computeService.runCompute(cancellableTask, new ComputeContext(str3, "remote_reduce", str, of, configuration, newFoldContext, exchangeSourceHandler::createExchangeSource, () -> {
                    return sinkHandler.createExchangeSink(() -> {
                    });
                }), reductionPlan, computeListener.acquireCompute());
                this.dataNodeComputeHandler.startComputeOnDataNodes(str3, str, cancellableTask, configuration, exchangeSinkExec, set, originalIndices, exchangeSourceHandler, cancelQueryOnFailure, computeListener.acquireCompute().map(computeResponse -> {
                    atomicReference.set(computeResponse);
                    return computeResponse.getProfiles();
                }));
                if (addEmptySink != null) {
                    addEmptySink.close();
                }
                computeListener.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                computeListener.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    static {
        $assertionsDisabled = !ClusterComputeHandler.class.desiredAssertionStatus();
    }
}
