package com.alibaba.tesla.dag.dispatch;

import com.alibaba.tesla.dag.algorithm.DAG;
import com.alibaba.tesla.dag.constant.DagConstant;
import com.alibaba.tesla.dag.notify.DagInstDispatch;
import com.alibaba.tesla.dag.notify.DagInstNodeTask;
import com.alibaba.tesla.dag.notify.IDagInstNodeTaskNotify;
import com.alibaba.tesla.dag.notify.IDagInstNotify;
import com.alibaba.tesla.dag.notify.NodeTaskType;
import com.alibaba.tesla.dag.repository.dao.DagInstEdgeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstEdgeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.schedule.status.DagInstEdgeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstNodeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstStatus;
import com.alibaba.tesla.dag.services.DagInstNewService;
import com.alibaba.tesla.dag.services.DagInstNodeNewService;
import com.alibaba.tesla.dag.util.DagUtil;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/tesla/dag/dispatch/RunningDagInstDispatcher.class */
public class RunningDagInstDispatcher implements IDagInstDispatcher {
    private static final Logger log = LoggerFactory.getLogger(RunningDagInstDispatcher.class);

    @Autowired
    private IDagInstNotify dagInstNotify;

    @Autowired
    private IDagInstNodeTaskNotify dagInstNodeTaskNotify;

    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;

    @Autowired
    private DagInstEdgeDAO dagInstEdgeDAO;

    @Autowired
    private DagInstNewService dagInstNewService;

    @Autowired
    private DagInstNodeNewService dagInstNodeNewService;

    private Boolean conditionIsAllSuccess(List<DagInstNodeDO> list, String str, Map<String, DagInstEdgeDO> map) {
        for (DagInstNodeDO dagInstNodeDO : list) {
            if (dagInstNodeDO.fetchNodeStatus() == DagInstNodeStatus.SUCCESS) {
                DagInstEdgeDO dagInstEdgeDO = map.get(dagInstNodeDO.getNodeId() + "-" + str);
                if (Objects.nonNull(dagInstEdgeDO) && dagInstEdgeDO.fetchEdgeStatus() == DagInstEdgeStatus.INIT) {
                    this.dagInstNodeNewService.calcEdge(dagInstNodeDO);
                    return null;
                }
                if (Objects.isNull(dagInstEdgeDO) || dagInstEdgeDO.fetchEdgeStatus() != DagInstEdgeStatus.SUCCESS || Objects.equals(dagInstEdgeDO.getIsPass(), 0)) {
                    return false;
                }
            }
        }
        return true;
    }

    @Override // com.alibaba.tesla.dag.dispatch.IDagInstDispatcher
    public DagInstStatus registerType() {
        return DagInstStatus.RUNNING;
    }

    @Override // com.alibaba.tesla.dag.dispatch.IDagInstDispatcher
    public void dispatch(DagInstDO dagInstDO) {
        Long id = dagInstDO.getId();
        log.info(">>>[Running]|dispatch|dagInstId={}", id);
        List<DagInstNodeDO> simpleList = this.dagInstNodeDAO.getSimpleList(id);
        List<DagInstEdgeDO> simpleList2 = this.dagInstEdgeDAO.getSimpleList(id);
        Map<String, DagInstNodeDO> map = (Map) simpleList.stream().collect(Collectors.toMap((v0) -> {
            return v0.getNodeId();
        }, dagInstNodeDO -> {
            return dagInstNodeDO;
        }));
        Map<String, DagInstEdgeDO> map2 = (Map) simpleList2.stream().collect(Collectors.toMap((v0) -> {
            return v0.fetchKey();
        }, dagInstEdgeDO -> {
            return dagInstEdgeDO;
        }));
        DAG calcDAG = DagUtil.calcDAG(simpleList, simpleList2);
        Iterator<DagInstNodeDO> it = simpleList.iterator();
        while (it.hasNext()) {
            doChildrenOnExceptionOrStopped(it.next(), calcDAG, map);
        }
        for (DagInstNodeDO dagInstNodeDO2 : simpleList) {
            if (Objects.equals(dagInstNodeDO2.fetchNodeStatus(), DagInstNodeStatus.INIT)) {
                Set parent = calcDAG.getParent(dagInstNodeDO2.getNodeId());
                if (CollectionUtils.isEmpty(parent)) {
                    log.info(">>>runningDagInstDispatcher|dispatch|start|Header Node={}", dagInstNodeDO2.toSimpleString());
                    dispatch(dagInstNodeDO2);
                } else {
                    List<DagInstNodeDO> list = (List) simpleList.stream().filter(dagInstNodeDO3 -> {
                        return parent.contains(dagInstNodeDO3.getNodeId());
                    }).collect(Collectors.toList());
                    List<DagInstNodeStatus> list2 = (List) list.stream().map(dagInstNodeDO4 -> {
                        return dagInstNodeDO4.fetchNodeStatus();
                    }).collect(Collectors.toList());
                    if (isOnly(list2, Arrays.asList(DagInstNodeStatus.SKIP))) {
                        log.info(">>>runningDagInstDispatcher|dispatch|skip|node={}", dagInstNodeDO2.toSimpleString());
                        dagInstNodeDO2.setStatus(DagInstNodeStatus.SKIP.toString());
                        this.dagInstNodeDAO.updateStatus(dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP);
                    } else if (isOnly(list2, Arrays.asList(DagInstNodeStatus.SKIP, DagInstNodeStatus.SUCCESS))) {
                        log.info(">>>runningDagInstDispatcher|dispatch|success|node={}", dagInstNodeDO2.toSimpleString());
                        Boolean conditionIsAllSuccess = conditionIsAllSuccess(list, dagInstNodeDO2.getNodeId(), map2);
                        if (conditionIsAllSuccess == Boolean.TRUE) {
                            dispatch(dagInstNodeDO2);
                        } else if (conditionIsAllSuccess == Boolean.FALSE) {
                            log.info(">>>runningDagInstDispatcher|dispatch|edge condition is false|node={}", dagInstNodeDO2.toSimpleString());
                            dagInstNodeDO2.setStatus(DagInstNodeStatus.SKIP.toString());
                            this.dagInstNodeDAO.updateStatus(dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP);
                        }
                    } else {
                        log.info(">>>runningDagInstDispatcher|dispatch|ignore|node={}, parentStatusList={}", dagInstNodeDO2.toSimpleString(), list2);
                    }
                }
            }
        }
        List list3 = (List) simpleList.stream().map((v0) -> {
            return v0.fetchNodeStatus();
        }).collect(Collectors.toList());
        if (list3.contains(DagInstNodeStatus.INIT) || list3.contains(DagInstNodeStatus.RUNNING)) {
            return;
        }
        if (list3.contains(DagInstNodeStatus.EXCEPTION) || list3.contains(DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION)) {
            this.dagInstNewService.freshInstStatus(dagInstDO, registerType(), DagInstStatus.EXCEPTION);
            return;
        }
        DagInstNodeDO dagInstNode = this.dagInstNodeDAO.getDagInstNode(id, DagConstant.POST_NODE_ID);
        if (Objects.isNull(dagInstNode)) {
            this.dagInstNewService.freshInstStatus(dagInstDO, registerType(), DagInstStatus.SUCCESS);
        } else {
            this.dagInstNewService.freshInstStatus(dagInstDO, registerType(), DagInstStatus.POST_RUNNING);
            this.dagInstNotify.sendDagInstDispatch(DagInstDispatch.builder().dagInstId(id).nodeId(dagInstNode.getNodeId()).dagInstStatus(DagInstStatus.POST_RUNNING).build());
        }
    }

    private void doChildrenOnExceptionOrStopped(DagInstNodeDO dagInstNodeDO, DAG dag, Map<String, DagInstNodeDO> map) {
        DagInstNodeStatus fetchNodeStatus = dagInstNodeDO.fetchNodeStatus();
        if (fetchNodeStatus.isException() || fetchNodeStatus.isStopped()) {
            Set children = dag.getChildren(dagInstNodeDO.getNodeId());
            if (CollectionUtils.isEmpty(children)) {
                return;
            }
            Iterator it = children.iterator();
            while (it.hasNext()) {
                DagInstNodeDO dagInstNodeDO2 = map.get(it.next());
                if (Objects.isNull(dagInstNodeDO2)) {
                    return;
                }
                DagInstNodeStatus fetchNodeStatus2 = dagInstNodeDO2.fetchNodeStatus();
                if (fetchNodeStatus.isException()) {
                    if (!fetchNodeStatus2.isException()) {
                        log.info(">>>runningDagInstDispatcher|doChildrenOnException|{}->{}", dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                        this.dagInstNodeDAO.updateStatus(dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                    }
                    dagInstNodeDO2.setStatus(DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION.toString());
                } else if (fetchNodeStatus.isStopped()) {
                    if (!fetchNodeStatus2.isStopped()) {
                        log.info(">>>runningDagInstDispatcher|doChildrenOnStopped|{}->{}", dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                        this.dagInstNodeDAO.updateStatus(dagInstNodeDO2.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_STOPPED);
                    }
                    dagInstNodeDO2.setStatus(DagInstNodeStatus.SKIP_CAUSE_BY_STOPPED.toString());
                }
                doChildrenOnExceptionOrStopped(dagInstNodeDO2, dag, map);
            }
        }
    }

    private boolean isOnly(List<DagInstNodeStatus> list, List<DagInstNodeStatus> list2) {
        return list.stream().filter(dagInstNodeStatus -> {
            return list2.contains(dagInstNodeStatus);
        }).count() == ((long) list.size());
    }

    private void dispatch(DagInstNodeDO dagInstNodeDO) {
        this.dagInstNodeTaskNotify.sendDagInstNodeTask(DagInstNodeTask.builder().dagInstId(dagInstNodeDO.getDagInstId()).nodeId(dagInstNodeDO.getNodeId()).nodeTaskType(NodeTaskType.START).build());
    }
}
