package com.alibaba.tesla.dag.services;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.tesla.dag.common.Tools;
import com.alibaba.tesla.dag.constant.DagConstant;
import com.alibaba.tesla.dag.local.AbstractLocalNodeBase;
import com.alibaba.tesla.dag.local.ClassService;
import com.alibaba.tesla.dag.local.LocalTaskDO;
import com.alibaba.tesla.dag.model.domain.dagnode.DagInstNodeRunRet;
import com.alibaba.tesla.dag.model.domain.dagnode.DagNodeDetailLocal;
import com.alibaba.tesla.dag.repository.dao.DagInstDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeStdDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeStdDO;
import com.alibaba.tesla.dag.repository.domain.DagNodeDO;
import com.alibaba.tesla.dag.schedule.task.TaskStatus;
import com.alibaba.tesla.dag.util.DateUtil;
import com.google.common.base.Throwables;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/tesla/dag/services/LocalTaskService.class */
public class LocalTaskService {
    private static final Logger log = LoggerFactory.getLogger(LocalTaskService.class);

    @Autowired
    private DagInstNodeStdDAO dagInstNodeStdDAO;

    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;

    @Autowired
    private DagInstDAO dagInstDAO;

    @Autowired
    private ClassService classService;

    @Autowired
    private DagInstNodeNewService dagInstNodeNewService;

    @Value("${dag.local.thread-pool-size:100}")
    private int localThreadPoolSize;
    private ThreadPoolExecutor localTaskThreadPoolExecutor;
    private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(5);
    private Map<Long, Future> localTaskId2FutureMap = new HashMap(DagConstant.SECOND);
    private Map<Long, AbstractLocalNodeBase> localTaskId2NodeMap = new HashMap(DagConstant.SECOND);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/tesla/dag/services/LocalTaskService$LocalTask.class */
    public class LocalTask implements Runnable {
        private LocalTaskDO localTaskDO;

        public LocalTask(LocalTaskDO localTaskDO) {
            this.localTaskDO = localTaskDO;
        }

        @Override // java.lang.Runnable
        public void run() {
            LocalTaskService.log.info(">>>localTaskService|localTask run enter|localTaskDO={}", this.localTaskDO);
            Long dagInstId = this.localTaskDO.getDagInstId();
            String nodeId = this.localTaskDO.getNodeId();
            Long taskId = this.localTaskDO.getTaskId();
            try {
                try {
                    DagInstNodeStdDO build = DagInstNodeStdDO.builder().id(taskId).build();
                    AbstractLocalNodeBase newInstance = LocalTaskService.this.newInstance(taskId, LocalTaskService.this.dagInstNodeDAO.getDagInstNode(dagInstId, nodeId));
                    newInstance.actRunTimestamp = DateUtil.currentSeconds();
                    build.setStatus(TaskStatus.RUNNING.toString());
                    build.setIp(Tools.localIp);
                    build.setInitGlobalParams(JSONObject.toJSONString(newInstance.globalParams));
                    LocalTaskService.this.dagInstNodeStdDAO.update(build);
                    LocalTaskService.log.info(">>>localTask|run|enter|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, taskId});
                    DagInstNodeRunRet run = newInstance.run();
                    build.setStatus(TaskStatus.SUCCESS.name());
                    build.setStdout(JSONObject.toJSONString(run));
                    build.setGlobalParams(newInstance.isDeleteParams.booleanValue() ? DagConstant.FAAS_DELETE_OPERATION : JSONObject.toJSONString(newInstance.globalParams));
                    LocalTaskService.this.dagInstNodeStdDAO.update(build);
                    LocalTaskService.log.info(">>>localTask|run|exit|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, taskId});
                    LocalTaskService.this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, taskId, TaskStatus.SUCCESS, "");
                    LocalTaskService.this.localTaskId2FutureMap.remove(taskId);
                    LocalTaskService.this.localTaskId2NodeMap.remove(taskId);
                } catch (Exception e) {
                    LocalTaskService.log.error(">>>localTask|run|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, taskId, e.toString(), e});
                    LocalTaskService.this.dagInstNodeStdDAO.updateStatusWithDetail(taskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString(e));
                    try {
                        LocalTaskService.this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, taskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString(e));
                    } catch (Exception e2) {
                        LocalTaskService.log.error(">>>localTask|jobCallBack|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, taskId, e2.toString(), e2});
                    }
                    LocalTaskService.this.localTaskId2FutureMap.remove(taskId);
                    LocalTaskService.this.localTaskId2NodeMap.remove(taskId);
                }
            } catch (Throwable th) {
                LocalTaskService.this.localTaskId2FutureMap.remove(taskId);
                LocalTaskService.this.localTaskId2NodeMap.remove(taskId);
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/alibaba/tesla/dag/services/LocalTaskService$ReBuilder.class */
    private class ReBuilder implements Runnable {
        private ReBuilder() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Long valueOf = Long.valueOf(DateUtil.currentSeconds() - 30);
            List<DagInstNodeStdDO> listTimeOut = LocalTaskService.this.dagInstNodeStdDAO.listTimeOut(Long.valueOf(valueOf.longValue() - (60 * DateUtil.MINUTE)), valueOf);
            if (CollectionUtils.isNotEmpty(listTimeOut)) {
                Iterator<DagInstNodeStdDO> it = listTimeOut.iterator();
                while (it.hasNext()) {
                    Long id = it.next().getId();
                    if (!LocalTaskService.this.localTaskId2FutureMap.containsKey(id)) {
                        DagInstNodeDO queryByLocalTaskId = LocalTaskService.this.queryByLocalTaskId(id);
                        if (Objects.nonNull(queryByLocalTaskId)) {
                            Long dagInstId = queryByLocalTaskId.getDagInstId();
                            String nodeId = queryByLocalTaskId.getNodeId();
                            LocalTaskService.log.info(">>>reBuilder|task is rebuild|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, id});
                            LocalTaskService.this.doLocalTask(LocalTaskDO.builder().dagInstId(dagInstId).nodeId(nodeId).taskId(id).build());
                        }
                    }
                }
            }
        }
    }

    /* loaded from: input_file:com/alibaba/tesla/dag/services/LocalTaskService$TimeoutHandler.class */
    private class TimeoutHandler implements Runnable {
        private TimeoutHandler() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                for (Long l : (List) LocalTaskService.this.localTaskId2FutureMap.keySet().stream().collect(Collectors.toList())) {
                    if (LocalTaskService.this.isTimeout(l)) {
                        LocalTaskService.log.info(">>>timeoutHandler|task is timeout|localTaskId={}", l);
                        LocalTaskService.this.dagInstNodeStdDAO.updateStatusWithDetail(l, TaskStatus.EXCEPTION, "timeout, killed by system");
                        Future future = (Future) LocalTaskService.this.localTaskId2FutureMap.remove(l);
                        if (Objects.nonNull(future)) {
                            future.cancel(true);
                        }
                        LocalTaskService.this.localTaskId2NodeMap.remove(l);
                    }
                }
            } catch (Exception e) {
                LocalTaskService.log.error(">>>timeoutHandler|scheduleAtFixedRate|Err={}", e.toString(), e);
            }
        }
    }

    @PostConstruct
    public void initScheduled() {
        log.info(">>>localTaskService|initScheduled|enter|localThreadPoolSize={}", Integer.valueOf(this.localThreadPoolSize));
        this.localTaskThreadPoolExecutor = new ThreadPoolExecutor(this.localThreadPoolSize, this.localThreadPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.scheduledService.scheduleAtFixedRate(new TimeoutHandler(), 60L, 5L, TimeUnit.SECONDS);
        this.scheduledService.scheduleAtFixedRate(new ReBuilder(), 10L, 10L, TimeUnit.SECONDS);
    }

    public ThreadPoolExecutor getLocalTaskThreadPoolExecutor() {
        return this.localTaskThreadPoolExecutor;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DagInstNodeDO queryByLocalTaskId(Long l) {
        DagInstNodeStdDO dagInstNodeStdById = this.dagInstNodeStdDAO.getDagInstNodeStdById(l);
        if (Objects.isNull(dagInstNodeStdById)) {
            log.warn(">>>localTaskService|run|local task does not exist|localTaskId={}", l);
            return null;
        }
        String status = dagInstNodeStdById.getStatus();
        if (DagInstNodeStdDAO.UN_END_STATUS_LIST.contains(status)) {
            return this.dagInstNodeDAO.getDagInstNode(dagInstNodeStdById.getDagInstNodeId());
        }
        log.warn(">>>localTaskService|queryByLocalTaskId|double check|dagInstId={}, nodeId={}, localTaskId={}, currentStatus={}", new Object[]{dagInstNodeStdById.getDagInstId(), dagInstNodeStdById.getDagInstNodeId(), l, status});
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isTimeout(Long l) {
        AbstractLocalNodeBase abstractLocalNodeBase = this.localTaskId2NodeMap.get(l);
        return (Objects.isNull(abstractLocalNodeBase) || abstractLocalNodeBase.realRunTimeout == 0 || DateUtil.currentSeconds() - abstractLocalNodeBase.actRunTimestamp <= abstractLocalNodeBase.realRunTimeout) ? false : true;
    }

    public void doLocalTask(LocalTaskDO localTaskDO) {
        Long dagInstId = localTaskDO.getDagInstId();
        String nodeId = localTaskDO.getNodeId();
        Long taskId = localTaskDO.getTaskId();
        try {
            if (this.localTaskId2FutureMap.containsKey(localTaskDO.getTaskId())) {
                log.warn(">>>localTaskService|doLocalTask|task is exist|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, taskId});
            } else {
                this.localTaskId2FutureMap.put(localTaskDO.getTaskId(), this.localTaskThreadPoolExecutor.submit(new LocalTask(localTaskDO)));
            }
        } catch (Throwable th) {
            log.error(">>>localTaskService|doLocalTask|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, taskId, th.toString(), th});
            this.dagInstNodeStdDAO.updateStatusWithDetail(taskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString(th));
            try {
                this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, taskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString(th));
            } catch (Exception e) {
                log.error(">>>localTaskService|jobCallBack|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, taskId, e.toString(), e});
            }
        }
    }

    public AbstractLocalNodeBase newInstance(Long l, DagInstNodeDO dagInstNodeDO) throws Exception {
        if (this.localTaskId2NodeMap.containsKey(l)) {
            return this.localTaskId2NodeMap.get(l);
        }
        DagInstDO dagInstById = this.dagInstDAO.getDagInstById(dagInstNodeDO.getDagInstId());
        DagNodeDO fetchDagNode = dagInstNodeDO.fetchDagNode();
        String name = ((DagNodeDetailLocal) fetchDagNode.fetchDetailInterface()).getName();
        JSONObject inputParams = AbstractActionNewService.inputParams(dagInstById, dagInstNodeDO);
        Class cls = this.classService.nodeMap.get(name);
        AbstractLocalNodeBase abstractLocalNodeBase = (AbstractLocalNodeBase) cls.getConstructor(new Class[0]).newInstance(new Object[0]);
        log.info(">>>localTaskService|newInstance|dagInstId={}, taskId={}, dagNodeName={}, nodeClass={}", new Object[]{dagInstById.getId(), l, name, cls});
        abstractLocalNodeBase.dagInstId = dagInstById.getId();
        abstractLocalNodeBase.dagInstNodeId = dagInstNodeDO.getId();
        abstractLocalNodeBase.params = inputParams;
        abstractLocalNodeBase.globalParams = dagInstById.fetchGlobalParamsJson();
        abstractLocalNodeBase.lastGlobalVariableTimestamp = Long.valueOf(DateUtil.currentSeconds());
        abstractLocalNodeBase.globalVariable = dagInstById.fetchGlobalVariableJson();
        abstractLocalNodeBase.globalResult = dagInstById.fetchGlobalResultJson();
        abstractLocalNodeBase.nodeId = dagInstNodeDO.getNodeId();
        abstractLocalNodeBase.fatherNodeId = dagInstById.fetchRelationNodeId();
        abstractLocalNodeBase.retryTimes = Objects.isNull(dagInstNodeDO.getRetryTimes()) ? 0L : dagInstNodeDO.getRetryTimes().longValue();
        abstractLocalNodeBase.instanceCreateTimestamp = DateUtil.currentSeconds();
        abstractLocalNodeBase.realRunTimeout = Objects.isNull(fetchDagNode.getRunTimeout()) ? 0L : fetchDagNode.getRunTimeout().longValue();
        this.localTaskId2NodeMap.put(l, abstractLocalNodeBase);
        return abstractLocalNodeBase;
    }
}
