package edu.iu.dsc.tws.task.impl.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IAllJoinedListener;
import edu.iu.dsc.tws.api.resource.IReceiverFromDriver;
import edu.iu.dsc.tws.api.resource.IScalerListener;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.Network;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.master.worker.JMSenderToDriver;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/impl/cdfw/CDFWRuntime.class */
public class CDFWRuntime implements IReceiverFromDriver, IScalerListener, IAllJoinedListener {
    private static final Logger LOG = Logger.getLogger(CDFWRuntime.class.getName());
    private int workerId;
    private TaskExecutor taskExecutor;
    private Config config;
    private IWorkerController controller;
    private Communicator communicator;
    private TWSChannel channel;
    private AtomicBoolean scaleUpRequest = new AtomicBoolean(false);
    private AtomicBoolean scaleDownRequest = new AtomicBoolean(false);
    private BlockingQueue<Any> executeMessageQueue = new LinkedBlockingQueue();
    private KryoSerializer serializer = new KryoSerializer();

    public CDFWRuntime(Config config, int i, IWorkerController iWorkerController) {
        this.workerId = i;
        this.controller = iWorkerController;
        this.config = config;
        List<JobMasterAPI.WorkerInfo> initSynch = initSynch(iWorkerController);
        if (initSynch == null) {
            return;
        }
        this.channel = Network.initializeChannel(this.config, iWorkerController);
        this.communicator = new Communicator(this.config, this.channel, (String) null);
        this.taskExecutor = new TaskExecutor(config, i, initSynch, this.communicator, null);
    }

    private List<JobMasterAPI.WorkerInfo> initSynch(IWorkerController iWorkerController) {
        try {
            List<JobMasterAPI.WorkerInfo> allWorkers = iWorkerController.getAllWorkers();
            if (allWorkers == null) {
                LOG.severe("Can not get all workers to join. Something wrong. Exiting ....................");
                return null;
            }
            LOG.info(allWorkers.size() + " workers joined. ");
            LOG.fine("Waiting on a barrier ........................ ");
            try {
                iWorkerController.waitOnBarrier();
                LOG.fine("Proceeded through the barrier ........................ ");
                return allWorkers;
            } catch (TimeoutException e) {
                LOG.log(Level.SEVERE, e.getMessage(), e);
                return null;
            }
        } catch (TimeoutException e2) {
            LOG.log(Level.SEVERE, e2.getMessage(), e2);
            return null;
        }
    }

    public boolean execute() {
        while (true) {
            if (this.executeMessageQueue.peek() == null) {
                if (this.scaleUpRequest.get()) {
                    this.communicator.close();
                    List<JobMasterAPI.WorkerInfo> initSynch = initSynch(this.controller);
                    LOG.info("Existing workers calling barrier");
                    this.channel = Network.initializeChannel(this.config, this.controller);
                    this.communicator = new Communicator(this.config, this.channel, (String) null);
                    this.taskExecutor = new TaskExecutor(this.config, this.workerId, initSynch, this.communicator, null);
                }
                this.scaleUpRequest.set(false);
            } else {
                Any poll = this.executeMessageQueue.poll();
                if (poll.is(CDFWJobAPI.ExecuteMessage.class)) {
                    if (handleExecuteMessage(poll)) {
                        return false;
                    }
                } else if (poll.is(CDFWJobAPI.CDFWJobCompletedMessage.class)) {
                    LOG.log(Level.INFO, this.workerId + "Received CDFW job completed message. Leaving execution loop");
                    LOG.log(Level.INFO, this.workerId + " Execution Completed");
                    return true;
                }
            }
        }
    }

    private boolean handleExecuteMessage(Any any) {
        JMSenderToDriver senderToDriver = JMWorkerAgent.getJMWorkerAgent().getSenderToDriver();
        try {
            CDFWJobAPI.SubGraph graph = any.unpack(CDFWJobAPI.ExecuteMessage.class).getGraph();
            ComputeGraph computeGraph = (ComputeGraph) this.serializer.deserialize(graph.getGraphSerialized().toByteArray());
            if (computeGraph == null) {
                LOG.severe(this.workerId + " Unable to find the subgraph " + graph.getName());
                return true;
            }
            this.taskExecutor.execute(computeGraph, this.taskExecutor.plan(computeGraph));
            CDFWJobAPI.ExecuteCompletedMessage build = CDFWJobAPI.ExecuteCompletedMessage.newBuilder().setSubgraphName(graph.getName()).build();
            if (!senderToDriver.sendToDriver(build)) {
                LOG.severe("Unable to send the subgraph completed message :" + build);
            }
            return false;
        } catch (InvalidProtocolBufferException e) {
            LOG.log(Level.SEVERE, "Unable to unpack received message ", e);
            return false;
        }
    }

    public void driverMessageReceived(Any any) {
        try {
            this.executeMessageQueue.put(any);
        } catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Unable to insert message to the queue", (Throwable) e);
        }
    }

    public void workersScaledUp(int i) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled up msg received. Instances added: " + i);
        this.scaleUpRequest.set(true);
    }

    public void workersScaledDown(int i) {
        LOG.log(Level.FINE, this.workerId + "Workers scaled down msg received. Instances removed: " + i);
        this.scaleDownRequest.set(true);
    }

    public void allWorkersJoined(List<JobMasterAPI.WorkerInfo> list) {
        LOG.log(Level.FINE, this.workerId + "All workers joined msg received");
    }

    private boolean reinitialize() {
        this.communicator.close();
        List list = null;
        try {
            list = this.controller.getAllWorkers();
        } catch (TimeoutException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
        this.channel = Network.initializeChannel(this.config, this.controller);
        this.communicator = new Communicator(this.config, this.channel, (String) null);
        this.taskExecutor = new TaskExecutor(this.config, this.workerId, list, this.communicator, null);
        return true;
    }
}
