package edu.iu.dsc.tws.task.impl;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecutor;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.IONames;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.dataset.EmptyDataObject;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.faulttolerance.Fault;
import edu.iu.dsc.tws.api.faulttolerance.FaultAcceptable;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.dataset.DataObjectImpl;
import edu.iu.dsc.tws.executor.core.ExecutionPlanBuilder;
import edu.iu.dsc.tws.executor.threading.ExecutorFactory;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.tsched.taskscheduler.TaskScheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/impl/TaskExecutor.class */
public class TaskExecutor implements FaultAcceptable {
    private static final Logger LOG = Logger.getLogger(TaskExecutor.class.getName());
    private Config config;
    private int workerID;
    private List<JobMasterAPI.WorkerInfo> workerInfoList;
    private Communicator communicator;
    private CheckpointingClient checkpointingClient;
    private ExecutorFactory executor;
    private Map<String, DataObject> dataObjectMap = new HashMap();
    private ExecutorList currentExecutors = new ExecutorList();

    public TaskExecutor(Config config, int i, List<JobMasterAPI.WorkerInfo> list, Communicator communicator, CheckpointingClient checkpointingClient) {
        this.config = config;
        this.workerID = i;
        this.workerInfoList = list;
        this.communicator = communicator;
        this.checkpointingClient = checkpointingClient;
        this.executor = new ExecutorFactory(this.config, this.workerID, this.communicator.getChannel());
    }

    public TaskExecutor(WorkerEnvironment workerEnvironment) {
        this.config = workerEnvironment.getConfig();
        this.workerID = workerEnvironment.getWorkerId();
        this.workerInfoList = workerEnvironment.getWorkerList();
        this.communicator = workerEnvironment.getCommunicator();
        this.checkpointingClient = workerEnvironment.getWorkerController().getCheckpointingClient();
        this.executor = new ExecutorFactory(this.config, this.workerID, this.communicator.getChannel());
    }

    public ExecutionPlan plan(ComputeGraph computeGraph) {
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        return new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, taskScheduler.schedule(computeGraph, createWorkerPlan()));
    }

    public Map<String, ExecutionPlan> plan(ComputeGraph... computeGraphArr) {
        WorkerPlan createWorkerPlan = createWorkerPlan();
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        Map schedule = taskScheduler.schedule(createWorkerPlan, computeGraphArr);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (ComputeGraph computeGraph : computeGraphArr) {
            linkedHashMap.put(computeGraph.getGraphName(), new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, (TaskSchedulePlan) schedule.get(computeGraph.getGraphName())));
        }
        return linkedHashMap;
    }

    public ExecutionPlan executionPlan(ComputeGraph computeGraph, TaskSchedulePlan taskSchedulePlan) {
        return new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, taskSchedulePlan);
    }

    public void execute(Config config, ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        IExecutor executor = this.executor.getExecutor(Config.newBuilder().putAll(this.config).putAll(config).build(), executionPlan, computeGraph.getOperationMode(), new ExecutionHookImpl(this.config, this.dataObjectMap, executionPlan, this.currentExecutors));
        executor.execute();
        executor.closeExecution();
    }

    public void execute(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        execute(this.config, computeGraph, executionPlan);
    }

    public void execute(ComputeGraph computeGraph) {
        execute(this.config, computeGraph, plan(computeGraph));
    }

    public IExecutor createExecution(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        IExecutor executor = this.executor.getExecutor(this.config, executionPlan, computeGraph.getOperationMode(), new ExecutionHookImpl(this.config, this.dataObjectMap, executionPlan, this.currentExecutors));
        this.currentExecutors.add(executor);
        return executor;
    }

    public IExecutor createExecution(ComputeGraph computeGraph) {
        ExecutionPlan plan = plan(computeGraph);
        IExecutor executor = this.executor.getExecutor(this.config, plan, computeGraph.getOperationMode(), new ExecutionHookImpl(this.config, this.dataObjectMap, plan, this.currentExecutors));
        this.currentExecutors.add(executor);
        return executor;
    }

    @Deprecated
    public void addInput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, String str2, DataObject<?> dataObject) {
        Map nodes = executionPlan.getNodes(str);
        if (nodes == null) {
            return;
        }
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            INodeInstance iNodeInstance = (INodeInstance) ((Map.Entry) it.next()).getValue();
            Receptor node = iNodeInstance.getNode();
            if (!(node instanceof Receptor)) {
                throw new RuntimeException("Cannot add input to non input instance: " + iNodeInstance);
            }
            node.add(str2, dataObject);
        }
    }

    @Deprecated
    public void addInput(IExecutor iExecutor, String str, String str2, DataObject<?> dataObject) {
        addInput(null, iExecutor.getExecutionPlan(), str, str2, dataObject);
    }

    @Deprecated
    public void addSourceInput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, DataObject<Object> dataObject) {
        Map nodes = executionPlan.getNodes();
        if (nodes == null) {
            throw new RuntimeException(String.format("%d Failed to set input for non-existing existing sources: %s", Integer.valueOf(this.workerID), executionPlan.getNodeNames()));
        }
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            Receptor node = ((INodeInstance) ((Map.Entry) it.next()).getValue()).getNode();
            if ((node instanceof Receptor) && (node instanceof ISource)) {
                node.add(str, dataObject);
            }
        }
    }

    @Deprecated
    public void addSourceInput(IExecutor iExecutor, String str, DataObject<Object> dataObject) {
        addSourceInput(null, iExecutor.getExecutionPlan(), str, dataObject);
    }

    @Deprecated
    public <T> DataObject<T> getOutput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str) {
        return EmptyDataObject.getInstance();
    }

    @Deprecated
    public <T> DataObject<T> getOutput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, String str2) {
        return this.dataObjectMap.getOrDefault(str2, EmptyDataObject.getInstance());
    }

    public <T> DataObject<T> getOutput(String str) {
        return this.dataObjectMap.get(str);
    }

    public boolean isOutputAvailable(String str) {
        return this.dataObjectMap.containsKey(str);
    }

    public void addInput(String str, DataObject dataObject) {
        this.dataObjectMap.put(str, dataObject);
    }

    public static void collectData(Config config, ExecutionPlan executionPlan, Map<String, DataObject> map) {
        Map nodes = executionPlan.getNodes();
        HashMap hashMap = new HashMap();
        if (nodes != null) {
            nodes.forEach((num, iNodeInstance) -> {
                Collector node = iNodeInstance.getNode();
                if (node instanceof Collector) {
                    IONames collectibleNames = node.getCollectibleNames();
                    collectibleNames.forEach(str -> {
                        DataPartition dataPartition = ((Collector) node).get(str);
                        if (collectibleNames.size() == 1 && dataPartition == null) {
                            dataPartition = ((Collector) node).get();
                        }
                        if (dataPartition == null) {
                            LOG.warning(String.format("Task index %d  of task %d returned null for data %s", Integer.valueOf(iNodeInstance.getIndex()), Integer.valueOf(iNodeInstance.getId()), str));
                        } else {
                            dataPartition.setId(iNodeInstance.getIndex());
                            ((DataObject) hashMap.computeIfAbsent(str, str -> {
                                return new DataObjectImpl(config);
                            })).addPartition(dataPartition);
                        }
                    });
                }
            });
        }
        map.putAll(hashMap);
    }

    public static void distributeData(ExecutionPlan executionPlan, Map<String, DataObject> map) {
        Map nodes = executionPlan.getNodes();
        if (nodes != null) {
            nodes.forEach((num, iNodeInstance) -> {
                Receptor node = iNodeInstance.getNode();
                if (node instanceof Receptor) {
                    for (String str : node.getReceivableNames()) {
                        DataObject dataObject = (DataObject) map.get(str);
                        if (dataObject == null) {
                            throw new Twister2RuntimeException("Couldn't find input data" + str + " for task " + iNodeInstance.getId());
                        }
                        DataPartition partition = dataObject.getPartition(iNodeInstance.getIndex());
                        if (partition == null) {
                            throw new Twister2RuntimeException("Couldn't find input data" + str + " for task index " + iNodeInstance.getIndex() + " of task" + iNodeInstance.getId());
                        }
                        node.add(str, dataObject);
                        node.add(str, partition);
                    }
                }
            });
        }
    }

    public void clearData(String str) {
        DataObject remove = this.dataObjectMap.remove(str);
        if (remove != null) {
            for (DataPartition dataPartition : remove.getPartitions()) {
                dataPartition.clear();
            }
        }
    }

    private WorkerPlan createWorkerPlan() {
        ArrayList arrayList = new ArrayList();
        Iterator<JobMasterAPI.WorkerInfo> it = this.workerInfoList.iterator();
        while (it.hasNext()) {
            arrayList.add(new Worker(it.next().getWorkerID()));
        }
        return new WorkerPlan(arrayList);
    }

    public void close() {
    }

    public void onFault(Fault fault) {
        this.currentExecutors.onFault(fault);
    }
}
