package edu.iu.dsc.tws.task.cdfw;

import com.google.protobuf.Any;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.common.driver.IDriverMessenger;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/cdfw/CDFWExecutor.class */
public final class CDFWExecutor {
    private static final Logger LOG = Logger.getLogger(CDFWExecutor.class.getName());
    private BlockingQueue<DriverEvent> driverEvents = new LinkedBlockingDeque();
    private DriverState driverState = DriverState.INITIALIZE;
    private IDriverMessenger driverMessenger;
    private CDFWEnv executionEnv;

    /* loaded from: input_file:edu/iu/dsc/tws/task/cdfw/CDFWExecutor$CDFWExecutorTask.class */
    private class CDFWExecutorTask implements Runnable {
        private DataFlowGraph dataFlowGraph;
        private Set<Integer> workerIDs;

        CDFWExecutorTask(DataFlowGraph dataFlowGraph, Set<Integer> set) {
            this.dataFlowGraph = dataFlowGraph;
            this.workerIDs = set;
        }

        @Override // java.lang.Runnable
        public void run() {
            CDFWExecutor.this.submitGraph(this.dataFlowGraph, this.workerIDs);
        }
    }

    public CDFWExecutor(CDFWEnv cDFWEnv, IDriverMessenger iDriverMessenger) {
        this.driverMessenger = iDriverMessenger;
        this.executionEnv = cDFWEnv;
    }

    public void execute(DataFlowGraph dataFlowGraph) {
        LOG.fine("Starting task graph Requirements:" + dataFlowGraph.getGraphName());
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + this.driverState);
        }
        submitGraph(dataFlowGraph, new CDFWScheduler(this.executionEnv.getWorkerInfoList()).schedule(dataFlowGraph));
    }

    public void executeCDFW(DataFlowGraph... dataFlowGraphArr) {
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + this.driverState);
        }
        Map<DataFlowGraph, Set<Integer>> schedule = new CDFWScheduler(this.executionEnv.getWorkerInfoList()).schedule(dataFlowGraphArr);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(schedule.size());
        for (Map.Entry<DataFlowGraph, Set<Integer>> entry : schedule.entrySet()) {
            newScheduledThreadPool.submit(new CDFWExecutorTask(entry.getKey(), entry.getValue()));
        }
        try {
            try {
                newScheduledThreadPool.awaitTermination(1L, TimeUnit.SECONDS);
                newScheduledThreadPool.shutdown();
            } catch (InterruptedException e) {
                throw new Twister2RuntimeException(e);
            }
        } catch (Throwable th) {
            newScheduledThreadPool.shutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        sendCloseMessage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitGraph(DataFlowGraph dataFlowGraph, Set<Integer> set) {
        if (this.driverState != DriverState.INITIALIZE && this.driverState != DriverState.JOB_FINISHED) {
            throw new Twister2RuntimeException("Failed to submit job in this state: " + this.driverState);
        }
        try {
            submitJob(buildCDFWJob(buildCDFWSchedulePlan(dataFlowGraph, set)));
            this.driverState = DriverState.JOB_SUBMITTED;
            waitForEvent(DriveEventType.FINISHED_JOB);
            this.driverState = DriverState.JOB_FINISHED;
        } catch (Exception e) {
            throw new Twister2RuntimeException("Driver is not initialized", e);
        }
    }

    private DataFlowGraph buildCDFWSchedulePlan(DataFlowGraph dataFlowGraph, Set<Integer> set) {
        dataFlowGraph.setCdfwSchedulePlans(CDFWJobAPI.CDFWSchedulePlan.newBuilder().addAllWorkers(set).build());
        return dataFlowGraph;
    }

    private void sendCloseMessage() {
        this.driverMessenger.broadcastToAllWorkers(CDFWJobAPI.CDFWJobCompletedMessage.newBuilder().setHtgJobname("").build());
    }

    private void submitJob(CDFWJobAPI.SubGraph subGraph) {
        LOG.log(Level.INFO, "Sending graph to workers for execution: " + subGraph.getName());
        CDFWJobAPI.ExecuteMessage.Builder newBuilder = CDFWJobAPI.ExecuteMessage.newBuilder();
        newBuilder.setSubgraphName(subGraph.getName());
        newBuilder.setGraph(subGraph);
        this.driverMessenger.broadcastToAllWorkers(newBuilder.build());
    }

    private CDFWJobAPI.SubGraph buildCDFWJob(DataFlowGraph dataFlowGraph) {
        return dataFlowGraph.build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void workerMessageReceived(Any any, int i) {
        LOG.log(Level.FINE, String.format("Received worker message %d: %s", Integer.valueOf(i), any.getClass().getName()));
        this.driverEvents.offer(new DriverEvent(DriveEventType.FINISHED_JOB, any, i));
    }

    private DriverEvent waitForEvent(DriveEventType driveEventType) throws Exception {
        try {
            DriverEvent take = this.driverEvents.take();
            if (take.getType() != driveEventType) {
                throw new Exception("Un-expected event: " + driveEventType);
            }
            return take;
        } catch (InterruptedException e) {
            throw new Twister2RuntimeException("Failed to take event", e);
        }
    }
}
