package org.elasticsearch.xpack.esql.plugin;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncSearchSecurity;
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;

/* loaded from: input_file:org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.class */
public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncStopRequest, EsqlQueryResponse> {
    private final TransportEsqlQueryAction queryAction;
    private final TransportEsqlAsyncGetResultsAction getResultsAction;
    private final ExchangeService exchangeService;
    private final BlockFactory blockFactory;
    private final ClusterService clusterService;
    private final TransportService transportService;
    private final AsyncSearchSecurity security;
    private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);

    @Inject
    public TransportEsqlAsyncStopAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, TransportEsqlQueryAction transportEsqlQueryAction, TransportEsqlAsyncGetResultsAction transportEsqlAsyncGetResultsAction, Client client, ExchangeService exchangeService, BlockFactory blockFactory) {
        super(EsqlAsyncStopAction.NAME, transportService, actionFilters, AsyncStopRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
        this.queryAction = transportEsqlQueryAction;
        this.getResultsAction = transportEsqlAsyncGetResultsAction;
        this.exchangeService = exchangeService;
        this.blockFactory = blockFactory;
        this.transportService = transportService;
        this.clusterService = clusterService;
        this.security = new AsyncSearchSecurity(".async-search", new SecurityContext(clusterService.getSettings(), client.threadPool().getThreadContext()), client, "async_search");
    }

    protected void doExecute(Task task, AsyncStopRequest asyncStopRequest, ActionListener<EsqlQueryResponse> actionListener) {
        AsyncExecutionId decode = AsyncExecutionId.decode(asyncStopRequest.getId());
        DiscoveryNode discoveryNode = this.clusterService.state().nodes().get(decode.getTaskId().getNodeId());
        if (this.clusterService.localNode().getId().equals(decode.getTaskId().getNodeId()) || discoveryNode == null) {
            stopQueryAndReturnResult(task, decode, actionListener);
        } else {
            this.transportService.sendRequest(discoveryNode, EsqlAsyncStopAction.NAME, asyncStopRequest, new ActionListenerResponseHandler(actionListener, EsqlQueryResponse.reader(this.blockFactory), EsExecutors.DIRECT_EXECUTOR_SERVICE));
        }
    }

    private String sessionID(AsyncExecutionId asyncExecutionId) {
        return new TaskId(this.clusterService.localNode().getId(), asyncExecutionId.getTaskId().getId()).toString();
    }

    private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncExecutionId, ActionListener<EsqlQueryResponse> actionListener) {
        String encoded = asyncExecutionId.getEncoded();
        EsqlQueryTask esqlQueryTask = getEsqlQueryTask(asyncExecutionId);
        ActionRequest getAsyncResultRequest = new GetAsyncResultRequest(encoded);
        if (esqlQueryTask == null) {
            logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", encoded);
            this.getResultsAction.execute(task, getAsyncResultRequest, actionListener);
            return;
        }
        logger.debug("Async stop for task {} - stopping", encoded);
        EsqlExecutionInfo executionInfo = esqlQueryTask.executionInfo();
        if (executionInfo != null) {
            executionInfo.markAsPartial();
        }
        Runnable runnable = () -> {
            this.getResultsAction.execute(task, getAsyncResultRequest, actionListener);
        };
        this.exchangeService.finishSessionEarly(sessionID(asyncExecutionId), ActionListener.running(() -> {
            if (esqlQueryTask.addCompletionListener(() -> {
                return ActionListener.running(runnable);
            })) {
                return;
            }
            runnable.run();
        }));
    }

    private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncExecutionId) {
        try {
            return AsyncTaskIndexService.getTaskAndCheckAuthentication(this.taskManager, this.security, asyncExecutionId, EsqlQueryTask.class);
        } catch (IOException e) {
            return null;
        }
    }

    protected /* bridge */ /* synthetic */ void doExecute(Task task, ActionRequest actionRequest, ActionListener actionListener) {
        doExecute(task, (AsyncStopRequest) actionRequest, (ActionListener<EsqlQueryResponse>) actionListener);
    }
}
