package com.alibaba.tesla.dag.services;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.tesla.dag.common.Tools;
import com.alibaba.tesla.dag.constant.DagConstant;
import com.alibaba.tesla.dag.model.domain.dagnode.DagNodeType;
import com.alibaba.tesla.dag.notify.DagInstDispatch;
import com.alibaba.tesla.dag.notify.DagInstNodeTask;
import com.alibaba.tesla.dag.notify.IDagInstNotify;
import com.alibaba.tesla.dag.repository.dao.DagInstDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstEdgeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstEdgeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.schedule.status.DagInstEdgeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstNodeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstStatus;
import com.alibaba.tesla.dag.schedule.task.TaskStatus;
import com.alibaba.tesla.dag.util.DateUtil;
import com.alibaba.tesla.dag.util.MonitorUtil;
import com.google.common.base.Throwables;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/tesla/dag/services/DagInstNodeNewService.class */
public class DagInstNodeNewService {
    private static final Logger log = LoggerFactory.getLogger(DagInstNodeNewService.class);

    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;

    @Autowired
    private DagInstEdgeDAO dagInstEdgeDAO;

    @Autowired
    private DagInstDAO dagInstDAO;

    @Autowired
    private DagInstNewService dagInstNewService;

    @Autowired
    private List<AbstractActionNewService> actionNewServiceList;

    @Autowired
    private IDagInstNotify dagInstNotify;

    public void startDagInstNode(DagInstNodeTask dagInstNodeTask) {
        long currentTimeMillis = System.currentTimeMillis();
        DagInstNodeDO dagInstNode = this.dagInstNodeDAO.getDagInstNode(dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId());
        if (Objects.isNull(dagInstNode)) {
            log.info(">>>dagInstNodeNewService|startDagInstNode|Not Found|dagInstId={}, nodeId={}", dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId());
            return;
        }
        DagInstNodeStatus valueOf = DagInstNodeStatus.valueOf(dagInstNode.getStatus());
        log.info(">>>dagInstNodeNewService|startDagInstNode|dagInstId={}, nodeId={}, status={}", new Object[]{dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId(), valueOf});
        try {
            switch (valueOf) {
                case INIT:
                    start(dagInstNode);
                    break;
                default:
                    log.warn(">>>dagInstNodeNewService|startDagInstNode|Status is Wrong|dagInstId={}, nodeId={}, status={}", new Object[]{dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId(), valueOf});
                    break;
            }
        } catch (Exception e) {
            log.error(">>>dagInstNodeNewService|startDagInstNode|dagInstId={}, err={}", new Object[]{dagInstNodeTask.getDagInstId(), e.getMessage(), e});
            this.dagInstNodeDAO.updateStatusWithDetail(dagInstNode.getId(), DagInstNodeStatus.EXCEPTION, Throwables.getStackTraceAsString(e));
            this.dagInstNotify.sendDagInstDispatch(DagInstDispatch.builder().dagInstId(dagInstNode.getDagInstId()).nodeId(dagInstNode.getNodeId()).dagInstStatus(DagInstStatus.RUNNING).build());
        }
        MonitorUtil.nodeStartMonitor.addCost(System.currentTimeMillis() - currentTimeMillis);
    }

    public void start(DagInstNodeDO dagInstNodeDO) throws Exception {
        DagInstDO dagInstById = this.dagInstDAO.getDagInstById(dagInstNodeDO.getDagInstId());
        DagInstNodeDO build = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
        build.setGmtStart(Long.valueOf(DateUtil.currentSeconds()));
        switch (dagInstNodeDO.fetchNodeType()) {
            case DAG:
                if (Objects.nonNull(dagInstNodeDO.getSubDagInstId()) && dagInstNodeDO.getSubDagInstId().longValue() > 0) {
                    log.warn(">>>dagInstNodeNewService|start|重复启动子DAG|subDagInstId={}", dagInstNodeDO.getSubDagInstId());
                    return;
                }
                Long submitSub = this.dagInstNewService.submitSub(dagInstNodeDO, dagInstById);
                log.info(">>>dagInstNodeNewService|SUCCESS|startDag|subDagInstId={}", submitSub);
                build.setSubDagInstId(submitSub);
                break;
            case NODE:
                long currentTimeMillis = System.currentTimeMillis();
                if (!StringUtils.isNotEmpty(dagInstNodeDO.getTaskId())) {
                    DagNodeType fetchNodeType = dagInstNodeDO.fetchDagNode().fetchNodeType();
                    AbstractActionNewService orElse = this.actionNewServiceList.stream().filter(abstractActionNewService -> {
                        return abstractActionNewService.registerNodeType() == fetchNodeType;
                    }).findFirst().orElse(null);
                    if (!Objects.isNull(orElse)) {
                        Long start = orElse.start(dagInstById, dagInstNodeDO);
                        log.info(">>>dagInstNodeNewService|SUCCESS|startJob|dagInstId={}, nodeId={}, taskId={}", new Object[]{dagInstNodeDO.getDagInstId(), dagInstNodeDO.getNodeId(), start});
                        MonitorUtil.taskMonitor.addCost(System.currentTimeMillis() - currentTimeMillis);
                        build.setTaskId(Long.toString(start.longValue()));
                        break;
                    } else {
                        throw new Exception("no such type: " + fetchNodeType);
                    }
                } else {
                    log.warn(">>>dagInstNodeNewService|start|重复启动作业|dagInstId={}, nodeId={}, taskId={}", new Object[]{dagInstNodeDO.getDagInstId(), dagInstNodeDO.getNodeId(), dagInstNodeDO.getTaskId()});
                    return;
                }
        }
        build.setStatus(DagInstNodeStatus.RUNNING.toString());
        this.dagInstNodeDAO.update(build);
    }

    public void inspectDagInstNode(DagInstNodeTask dagInstNodeTask) throws Exception {
        DagInstNodeDO dagInstNode = this.dagInstNodeDAO.getDagInstNode(dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId());
        if (Objects.isNull(dagInstNode)) {
            log.info(">>>dagInstNodeNewService|inspectDagInstNode|Not Found|dagInstNodeTask={}", dagInstNodeTask);
            return;
        }
        switch (dagInstNode.fetchNodeType()) {
            case DAG:
                Long subDagInstId = dagInstNode.getSubDagInstId();
                DagInstDO dagInstById = this.dagInstDAO.getDagInstById(subDagInstId);
                DagInstStatus fetchStatus = dagInstById.fetchStatus();
                if (fetchStatus == DagInstStatus.EXCEPTION) {
                    this.dagInstNodeDAO.updateStatus(dagInstNode.getId(), DagInstNodeStatus.EXCEPTION);
                } else if (fetchStatus == DagInstStatus.SUCCESS) {
                    DagInstNodeDO build = DagInstNodeDO.builder().id(dagInstNode.getId()).build();
                    build.setGlobalResult(dagInstById.getGlobalResult());
                    build.setGlobalParams(dagInstById.getGlobalParams());
                    build.setGlobalObject(dagInstById.getGlobalObject());
                    build.setStatus(DagInstNodeStatus.MERGE.toString());
                    build.setLockId(DateUtil.currentTimeMillis());
                    this.dagInstNodeDAO.update(build);
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|MERGE|dagInstId={}", dagInstNode.getDagInstId());
                    calcEdge(dagInstNode);
                    this.dagInstNewService.freshGlobalData(dagInstNode.getDagInstId());
                    DagInstNodeDO build2 = DagInstNodeDO.builder().id(dagInstNode.getId()).build();
                    build2.setStatus(DagInstNodeStatus.SUCCESS.toString());
                    this.dagInstNodeDAO.update(build2);
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|SUCCESS|dagInstId={}", dagInstNode.getDagInstId());
                } else {
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|兜底条件|subDagInstId={}, subDagInstStatus={}", subDagInstId, fetchStatus);
                }
                this.dagInstNotify.sendDagInstDispatch(DagInstDispatch.builder().dagInstId(dagInstNode.getDagInstId()).nodeId(dagInstNode.getNodeId()).dagInstStatus(DagInstStatus.RUNNING).build());
                return;
            case NODE:
                long currentTimeMillis = System.currentTimeMillis();
                if (dagInstNode.fetchNodeStatus() == DagInstNodeStatus.RUNNING) {
                    String taskId = dagInstNode.getTaskId();
                    if (StringUtils.isNotEmpty(taskId)) {
                        DagNodeType fetchNodeType = dagInstNode.fetchDagNode().fetchNodeType();
                        AbstractActionNewService orElse = this.actionNewServiceList.stream().filter(abstractActionNewService -> {
                            return abstractActionNewService.registerNodeType() == fetchNodeType;
                        }).findFirst().orElse(null);
                        Long valueOf = Long.valueOf(Long.parseLong(taskId));
                        TaskStatus status = orElse.status(valueOf);
                        MonitorUtil.statusMonitor.addCost(System.currentTimeMillis() - currentTimeMillis);
                        jobCallBack(dagInstNode.getDagInstId(), dagInstNode.getNodeId(), valueOf, status, "");
                        return;
                    }
                    return;
                }
                return;
            default:
                return;
        }
    }

    public void jobCallBack(Long l, String str, Long l2, TaskStatus taskStatus, String str2) throws Exception {
        log.info(">>>dagInstNodeNewService|jobCallBack|enter|dagInstId={}, nodeId={}, taskId={}, taskStatus={}, detail={}", new Object[]{l, str, l2, taskStatus, str2});
        DagInstNodeDO dagInstNode = this.dagInstNodeDAO.getDagInstNode(l, str);
        DagInstDO dagInstById = this.dagInstDAO.getDagInstById(l);
        long currentTimeMillis = System.currentTimeMillis();
        if (taskStatus == TaskStatus.SUCCESS) {
            DagNodeType fetchNodeType = dagInstNode.fetchDagNode().fetchNodeType();
            AbstractActionNewService orElse = this.actionNewServiceList.stream().filter(abstractActionNewService -> {
                return abstractActionNewService.registerNodeType() == fetchNodeType;
            }).findFirst().orElse(null);
            if (Objects.isNull(orElse)) {
                throw new Exception("no such type: " + fetchNodeType);
            }
            DagInstNodeDO build = DagInstNodeDO.builder().id(dagInstNode.getId()).build();
            JSONObject jSONObject = null;
            try {
                jSONObject = orElse.stdout(l2);
            } catch (Exception e) {
                this.dagInstNodeDAO.updateStatusWithDetail(dagInstNode.getId(), DagInstNodeStatus.EXCEPTION, e.getMessage());
            }
            if (Objects.nonNull(jSONObject)) {
                MonitorUtil.stdOutMonitor.addCost(System.currentTimeMillis() - currentTimeMillis);
                build.setGlobalObject(jSONObject.getString(DagConstant.OUTPUT_GLOBAL_OBJECT_KEY));
                build.setGlobalParams(jSONObject.getString(DagConstant.OUTPUT_GLOBAL_PARAMS_KEY));
                build.setGlobalResult(AbstractActionNewService.getDataResultAndOutput(jSONObject).toJSONString());
                build.setStatus(DagInstNodeStatus.MERGE.toString());
                build.setLockId(DateUtil.currentTimeMillis());
                this.dagInstNodeDAO.update(build);
                log.info(">>>dagInstNodeNewService|jobCallBack|MERGE|dagInstId={}", dagInstNode.getDagInstId());
                this.dagInstNewService.freshGlobalData(l);
                calcEdge(dagInstNode);
                DagInstNodeDO build2 = DagInstNodeDO.builder().id(dagInstNode.getId()).build();
                build2.setStatus(DagInstNodeStatus.SUCCESS.toString());
                this.dagInstNodeDAO.update(build2);
                log.info(">>>dagInstNodeNewService|jobCallBack|SUCCESS|dagInstId={}", dagInstNode.getDagInstId());
            }
        } else if (taskStatus == TaskStatus.EXCEPTION) {
            this.dagInstNodeDAO.updateStatusWithDetail(dagInstNode.getId(), DagInstNodeStatus.EXCEPTION, StringUtils.isEmpty(str2) ? "exception in running task" : str2);
        }
        this.dagInstNotify.sendDagInstDispatch(DagInstDispatch.builder().dagInstId(dagInstNode.getDagInstId()).nodeId(dagInstNode.getNodeId()).dagInstStatus(dagInstById.fetchStatus()).build());
    }

    public void calcEdge(DagInstNodeDO dagInstNodeDO) {
        List<DagInstEdgeDO> list = (List) this.dagInstEdgeDAO.getList(dagInstNodeDO.getDagInstId()).stream().filter(dagInstEdgeDO -> {
            return StringUtils.equals(dagInstEdgeDO.getSource(), dagInstNodeDO.getNodeId());
        }).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(list)) {
            DagInstDO dagInstDO = null;
            for (DagInstEdgeDO dagInstEdgeDO2 : list) {
                try {
                    String fetchExpressionString = dagInstEdgeDO2.fetchExpressionString();
                    Boolean bool = true;
                    if (StringUtils.isNotEmpty(fetchExpressionString)) {
                        if (Objects.isNull(dagInstDO)) {
                            dagInstDO = this.dagInstDAO.getDagInstById(dagInstNodeDO.getDagInstId());
                        }
                        bool = (Boolean) Tools.execExpression(dagInstEdgeDO2.fetchKey(), dagInstDO.fetchExpressionParamsJson(), fetchExpressionString, Boolean.class);
                        if (Objects.isNull(bool)) {
                            bool = false;
                        }
                    }
                    dagInstEdgeDO2.setIsPass(Integer.valueOf(bool.booleanValue() ? 1 : 0));
                    dagInstEdgeDO2.setStatus(DagInstEdgeStatus.SUCCESS.toString());
                } catch (Exception e) {
                    dagInstEdgeDO2.setStatus(DagInstEdgeStatus.EXCEPTION.toString());
                    dagInstEdgeDO2.setException(Throwables.getStackTraceAsString(e));
                }
                dagInstEdgeDO2.setGmtModified(Long.valueOf(DateUtil.currentSeconds()));
                this.dagInstEdgeDAO.update(dagInstEdgeDO2);
            }
        }
    }
}
