package cn.feiliu.taskflow.client.automator;

import cn.feiliu.taskflow.client.ApiClient;
import cn.feiliu.taskflow.client.automator.scheduling.MultiTaskResult;
import cn.feiliu.taskflow.client.automator.scheduling.PollExecuteStatus;
import cn.feiliu.taskflow.client.telemetry.MetricsContainer;
import cn.feiliu.taskflow.common.metadata.tasks.ExecutingTask;
import cn.feiliu.taskflow.common.metadata.tasks.TaskExecResult;
import cn.feiliu.taskflow.common.metadata.tasks.TaskLog;
import cn.feiliu.taskflow.common.utils.TaskflowUtils;
import cn.feiliu.taskflow.sdk.config.PropertyFactory;
import cn.feiliu.taskflow.sdk.worker.Worker;
import com.google.common.base.Stopwatch;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.api.patterns.ThreadPoolMonitor;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/feiliu/taskflow/client/automator/TaskPollExecutor.class */
public class TaskPollExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskPollExecutor.class);
    private static final Registry REGISTRY = Spectator.globalRegistry();
    protected final EurekaClient eurekaClient;
    protected final ApiClient apiClient;
    private final int updateRetryCount;
    protected final ThreadPoolExecutor executorService;
    protected final Map<String, String> taskToDomain;
    protected static final String DOMAIN = "domain";
    protected static final String OVERRIDE_DISCOVERY = "pollOutOfDiscovery";
    protected static final String ALL_WORKERS = "all";
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
        MetricsContainer.incrementUncaughtExceptionCount();
        LOGGER.error("Uncaught exception. Thread {} will exit now", thread, th);
    };
    private final Map<String, PollingSemaphore> pollingSemaphoreMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskPollExecutor(EurekaClient eurekaClient, ApiClient apiClient, int i, int i2, Map<String, String> map, String str, Map<String, Integer> map2) {
        this.eurekaClient = eurekaClient;
        this.apiClient = apiClient;
        this.updateRetryCount = i2;
        this.taskToDomain = map;
        int i3 = 0;
        if (map2.isEmpty()) {
            i3 = i;
            this.pollingSemaphoreMap.put(ALL_WORKERS, new PollingSemaphore(i));
        } else {
            for (Map.Entry<String, Integer> entry : map2.entrySet()) {
                String key = entry.getKey();
                int intValue = entry.getValue().intValue();
                i3 += intValue;
                this.pollingSemaphoreMap.put(key, new PollingSemaphore(intValue));
            }
        }
        LOGGER.info("Initialized the TaskPollExecutor with {} threads", Integer.valueOf(i3));
        this.executorService = new ThreadPoolExecutor(0, i3, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new BasicThreadFactory.Builder().namingPattern(str).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build(), new ThreadPoolExecutor.CallerRunsPolicy());
        ThreadPoolMonitor.attach(REGISTRY, this.executorService, str);
    }

    public void shutdown(int i) {
        try {
            this.executorService.shutdown();
            if (this.executorService.awaitTermination(i, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", Integer.valueOf(i)));
                this.executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void doExecuteTask(Worker worker, ExecutingTask executingTask) {
        Stopwatch createStarted = Stopwatch.createStarted();
        TaskExecResult taskExecResult = null;
        try {
            try {
                LOGGER.debug("Executing taskId: {} of type: {}", executingTask.getTaskId(), executingTask.getTaskDefName());
                taskExecResult = worker.execute(executingTask);
                taskExecResult.setWorkflowInstanceId(executingTask.getWorkflowInstanceId());
                taskExecResult.setTaskId(executingTask.getTaskId());
                taskExecResult.setWorkerId(worker.getIdentity());
                createStarted.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            } catch (Throwable th) {
                LOGGER.error("Unable to execute taskId: {} of type: {} ,error:{}", new Object[]{executingTask.getTaskId(), executingTask.getTaskDefName(), th});
                if (taskExecResult == null) {
                    executingTask.setStatus(ExecutingTask.Status.FAILED);
                    taskExecResult = new TaskExecResult(executingTask);
                }
                handleException(th, taskExecResult, worker, executingTask);
                createStarted.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            updateTaskResult(this.updateRetryCount, executingTask, taskExecResult, worker);
        } catch (Throwable th2) {
            createStarted.stop();
            MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            throw th2;
        }
    }

    private void finalizeTask(ExecutingTask executingTask, Throwable th) {
        if (th == null) {
            LOGGER.debug("Task:{} of type:{} finished processing with status:{}", new Object[]{executingTask.getTaskId(), executingTask.getTaskDefName(), executingTask.getStatus()});
        } else {
            LOGGER.error("Error processing task: {} of type: {}", new Object[]{executingTask.getTaskId(), executingTask.getTaskType(), th});
            MetricsContainer.incrementTaskExecutionErrorCount(executingTask.getTaskType(), th);
        }
    }

    private void updateTaskResult(int i, ExecutingTask executingTask, TaskExecResult taskExecResult, Worker worker) {
        try {
            TaskflowUtils.retryOperation(() -> {
                if (!this.apiClient.isUseGRPC()) {
                    this.apiClient.getTaskClient().updateTask(taskExecResult);
                    return;
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(this.apiClient.getGrpcApi().asyncUpdateTask(taskExecResult));
                for (TaskLog taskLog : taskExecResult.getLogs()) {
                    if (StringUtils.isNotBlank(taskLog.getLog())) {
                        arrayList.add(this.apiClient.getGrpcApi().addLog(taskLog));
                    }
                }
                TaskflowUtils.blockedWait(arrayList, 30000);
            }, i, "updateTask");
        } catch (Exception e) {
            worker.onErrorUpdate(executingTask);
            MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            LOGGER.error("Failed to update result: {} for task: {} in worker: {}", new Object[]{taskExecResult.toString(), executingTask.getTaskDefName(), worker.getIdentity(), e});
        }
    }

    private void handleException(Throwable th, TaskExecResult taskExecResult, Worker worker, ExecutingTask executingTask) {
        LOGGER.error(String.format("Error while executing task %s", executingTask.toString()), th);
        MetricsContainer.incrementTaskExecutionErrorCount(worker.getTaskDefName(), th);
        taskExecResult.setStatus(TaskExecResult.Status.FAILED);
        taskExecResult.setReasonForIncompletion("Error while executing the task: " + th);
        taskExecResult.log(TaskflowUtils.dumpFullStackTrace(th));
        updateTaskResult(this.updateRetryCount, executingTask, taskExecResult, worker);
    }

    private PollingSemaphore getPollingSemaphore(Worker worker) {
        return (PollingSemaphore) Objects.requireNonNull(this.pollingSemaphoreMap.containsKey(worker.getTaskDefName()) ? this.pollingSemaphoreMap.get(worker.getTaskDefName()) : this.pollingSemaphoreMap.get(ALL_WORKERS), "No polling semaphore found for task type: " + worker.getTaskDefName());
    }

    private int getAvailableThreads(Worker worker) {
        return getPollingSemaphore(worker).availableThreads();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isBusy(Worker worker) {
        return getAvailableThreads(worker) <= 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isActive(Worker worker) {
        Boolean valueOf = Boolean.valueOf(PropertyFactory.getBooleanWithFallback(worker.getTaskDefName(), OVERRIDE_DISCOVERY, ALL_WORKERS, false));
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals(InstanceInfo.InstanceStatus.UP) && !valueOf.booleanValue()) {
            LOGGER.debug("Instance is NOT UP in discovery - will not poll");
            return false;
        }
        if (!worker.paused()) {
            return true;
        }
        MetricsContainer.incrementTaskPausedCount(worker.getTaskDefName());
        LOGGER.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<MultiTaskResult> fastPollAndExecute(Worker worker) {
        return CompletableFuture.supplyAsync(() -> {
            if (!isActive(worker)) {
                return MultiTaskResult.of(PollExecuteStatus.FAIL, Collections.emptyList());
            }
            String taskDefName = worker.getTaskDefName();
            PollingSemaphore pollingSemaphore = getPollingSemaphore(worker);
            String stringWithFallback = PropertyFactory.getStringWithFallback(taskDefName, DOMAIN, ALL_WORKERS, this.taskToDomain.get(taskDefName));
            Optional<Integer> tryAcquireAvailablePermits = pollingSemaphore.tryAcquireAvailablePermits();
            if (!tryAcquireAvailablePermits.isPresent()) {
                return MultiTaskResult.of(PollExecuteStatus.NO_TASK, Collections.emptyList());
            }
            int intValue = tryAcquireAvailablePermits.get().intValue();
            try {
                List<ExecutingTask> batchTasks = getBatchTasks(worker, stringWithFallback, intValue);
                if (batchTasks.isEmpty()) {
                    pollingSemaphore.complete(intValue);
                    return MultiTaskResult.of(PollExecuteStatus.NO_TASK, Collections.emptyList());
                }
                if (intValue > batchTasks.size()) {
                    pollingSemaphore.complete(intValue - batchTasks.size());
                }
                return MultiTaskResult.of(intValue > batchTasks.size() ? PollExecuteStatus.NO_TASK : PollExecuteStatus.HAS_TASK, submitTasks(worker, batchTasks, stringWithFallback, pollingSemaphore));
            } catch (Exception e) {
                LOGGER.error("Error when polling for tasks", e);
                pollingSemaphore.complete(intValue);
                return MultiTaskResult.of(PollExecuteStatus.FAIL, Collections.emptyList());
            }
        });
    }

    private List<ExecutingTask> getBatchTasks(Worker worker, String str, int i) throws Exception {
        LOGGER.info("Polling tasks of type: {}", worker.getTaskDefName());
        String identity = worker.getIdentity();
        int reasonableTimeout = TaskflowUtils.getReasonableTimeout(worker);
        String taskDefName = worker.getTaskDefName();
        Timer batchPollTimer = MetricsContainer.getBatchPollTimer(worker.getTaskDefName());
        return this.apiClient.isUseGRPC() ? (List) batchPollTimer.record(() -> {
            return this.apiClient.getGrpcApi().batchPollTask(taskDefName, identity, str, i, reasonableTimeout);
        }) : (List) batchPollTimer.record(() -> {
            return this.apiClient.getTaskClient().batchPollTasksInDomain(taskDefName, str, identity, i, reasonableTimeout);
        });
    }

    private List<CompletableFuture<ExecutingTask>> submitTasks(Worker worker, List<ExecutingTask> list, String str, PollingSemaphore pollingSemaphore) {
        ArrayList arrayList = new ArrayList();
        String taskDefName = worker.getTaskDefName();
        for (ExecutingTask executingTask : list) {
            try {
                if (Objects.nonNull(executingTask) && StringUtils.isNotBlank(executingTask.getTaskId())) {
                    MetricsContainer.incrementTaskPollCount(taskDefName, 1);
                    LOGGER.debug("Polled task: {} of type: {}, from worker: {}", new Object[]{executingTask.getTaskId(), taskDefName, worker.getIdentity()});
                    arrayList.add(executingTask(worker, executingTask, pollingSemaphore));
                } else {
                    pollingSemaphore.complete();
                    arrayList.add(CompletableFuture.completedFuture(null));
                }
            } catch (Throwable th) {
                pollingSemaphore.complete();
                MetricsContainer.incrementTaskPollErrorCount(worker.getTaskDefName(), th);
                LOGGER.error("Error when polling for tasks", th);
                arrayList.add(CompletableFuture.failedFuture(th));
            }
        }
        return arrayList;
    }

    private CompletableFuture<ExecutingTask> executingTask(Worker worker, ExecutingTask executingTask, PollingSemaphore pollingSemaphore) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                try {
                    doExecuteTask(worker, executingTask);
                    pollingSemaphore.complete();
                } catch (Throwable th) {
                    executingTask.setStatus(ExecutingTask.Status.FAILED);
                    handleException(th, new TaskExecResult(executingTask), worker, executingTask);
                    pollingSemaphore.complete();
                }
                return executingTask;
            } catch (Throwable th2) {
                pollingSemaphore.complete();
                throw th2;
            }
        }, this.executorService).whenComplete(this::finalizeTask);
    }
}
