package edu.iu.dsc.tws.master.server;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.common.driver.IDriver;
import edu.iu.dsc.tws.master.dashclient.DashboardClient;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/master/server/WorkerMonitor.class */
public class WorkerMonitor implements MessageHandler {
    private static final Logger LOG = Logger.getLogger(WorkerMonitor.class.getName());
    private JobMaster jobMaster;
    private JMRRServer rrServer;
    private DashboardClient dashClient;
    private IDriver driver;
    private int numberOfWorkers;
    private boolean jobMasterAssignsWorkerIDs;
    private boolean sentAllWorkersRegistered = false;
    private int nextWorkerID = 0;
    private TreeMap<Integer, WorkerWithState> workers = new TreeMap<>();
    private HashMap<Integer, RequestID> waitList = new HashMap<>();

    public WorkerMonitor(JobMaster jobMaster, JMRRServer jMRRServer, DashboardClient dashboardClient, JobAPI.Job job, IDriver iDriver, boolean z) {
        this.jobMaster = jobMaster;
        this.rrServer = jMRRServer;
        this.dashClient = dashboardClient;
        this.driver = iDriver;
        this.numberOfWorkers = job.getNumberOfWorkers();
        this.jobMasterAssignsWorkerIDs = z;
    }

    public int getNumberOfWorkers() {
        return this.numberOfWorkers;
    }

    private int assignWorkerID() {
        int i = this.nextWorkerID;
        this.nextWorkerID++;
        return i;
    }

    private int getRegisteredWorkerID(String str, int i) {
        for (WorkerWithState workerWithState : this.workers.values()) {
            if (str.equals(workerWithState.getIp()) && i == workerWithState.getPort()) {
                return workerWithState.getWorkerID();
            }
        }
        return -1;
    }

    public void onMessage(RequestID requestID, int i, Message message) {
        if (message instanceof JobMasterAPI.Ping) {
            pingMessageReceived(requestID, (JobMasterAPI.Ping) message);
            return;
        }
        if (message instanceof JobMasterAPI.RegisterWorker) {
            JobMasterAPI.RegisterWorker registerWorker = (JobMasterAPI.RegisterWorker) message;
            if (registerWorker.getFromFailure()) {
                reregisterWorkerMessageReceived(requestID, registerWorker);
                return;
            } else {
                registerWorkerMessageReceived(requestID, registerWorker);
                return;
            }
        }
        if (message instanceof JobMasterAPI.WorkerStateChange) {
            stateChangeMessageReceived(requestID, (JobMasterAPI.WorkerStateChange) message);
            return;
        }
        if (message instanceof JobMasterAPI.ListWorkersRequest) {
            LOG.log(Level.FINE, "ListWorkersRequest received: " + message.toString());
            listWorkersMessageReceived(requestID, (JobMasterAPI.ListWorkersRequest) message);
        } else if (!(message instanceof JobMasterAPI.WorkerMessage)) {
            LOG.log(Level.SEVERE, "Un-known message received: " + message);
        } else {
            LOG.log(Level.FINE, "WorkerMessage received: " + message.toString());
            workerMessageReceived(requestID, (JobMasterAPI.WorkerMessage) message);
        }
    }

    private void pingMessageReceived(RequestID requestID, JobMasterAPI.Ping ping) {
        if (this.workers.containsKey(Integer.valueOf(ping.getWorkerID()))) {
            LOG.fine("Ping message received from a worker: \n" + ping);
            this.workers.get(Integer.valueOf(ping.getWorkerID())).setPingTimestamp(System.currentTimeMillis());
        } else {
            LOG.warning("Ping message received from a worker that has not joined the job yet: " + ping);
        }
        Message build = JobMasterAPI.Ping.newBuilder().setWorkerID(ping.getWorkerID()).setPingMessage("Ping Response From the Master to Worker").setMessageType(JobMasterAPI.Ping.MessageType.MASTER_TO_WORKER).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("Ping response sent to the worker: \n" + build);
        if (this.dashClient != null) {
            this.dashClient.workerHeartbeat(ping.getWorkerID());
        }
    }

    private void registerWorkerMessageReceived(RequestID requestID, JobMasterAPI.RegisterWorker registerWorker) {
        LOG.fine("RegisterWorker message received: \n" + registerWorker);
        JobMasterAPI.WorkerInfo workerInfo = registerWorker.getWorkerInfo();
        if (this.jobMasterAssignsWorkerIDs) {
            int assignWorkerID = assignWorkerID();
            workerInfo = WorkerInfoUtils.updateWorkerID(workerInfo, assignWorkerID);
            this.rrServer.setWorkerChannel(assignWorkerID);
        }
        if (this.workers.containsKey(Integer.valueOf(workerInfo.getWorkerID()))) {
            LOG.severe("Second RegisterWorker message received for workerID: " + workerInfo.getWorkerID() + "\nIgnoring this RegisterWorker message. \nReceived Message: " + registerWorker + "\nPrevious Worker with that workerID: " + this.workers.get(Integer.valueOf(workerInfo.getWorkerID())));
            sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), false, "Previously a worker registered with workerID: " + workerInfo.getWorkerID());
            return;
        }
        WorkerWithState workerWithState = new WorkerWithState(workerInfo);
        workerWithState.addWorkerState(JobMasterAPI.WorkerState.STARTING);
        this.workers.put(Integer.valueOf(workerWithState.getWorkerID()), workerWithState);
        sendRegisterWorkerResponse(requestID, workerWithState.getWorkerID(), true, null);
        if (this.dashClient != null) {
            this.dashClient.registerWorker(workerInfo);
        }
        if (allWorkersRegistered()) {
            LOG.info("All " + this.workers.size() + " workers joined the job.");
            sendListWorkersResponseToWaitList();
            sendWorkersJoinedMessage();
        }
    }

    private void reregisterWorkerMessageReceived(RequestID requestID, JobMasterAPI.RegisterWorker registerWorker) {
        LOG.fine("RegisterWorker message received: \n" + registerWorker);
        JobMasterAPI.WorkerInfo workerInfo = registerWorker.getWorkerInfo();
        if (registerWorker.getWorkerID() < 0) {
            sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), false, "Registration failed. WorkerID = " + registerWorker.getWorkerID() + ". WorkerID has to be the previous valid workerID when coming from failure.");
            LOG.warning("A worker tried to register after failure but did not have a valid workerID. Provided workerID = " + registerWorker.getWorkerID());
            return;
        }
        boolean z = false;
        WorkerWithState workerWithState = this.workers.get(Integer.valueOf(registerWorker.getWorkerID()));
        if (workerWithState == null) {
            workerWithState = new WorkerWithState(workerInfo);
            z = true;
            LOG.warning("A worker registered after failure, but it did not register previously. workerID = " + registerWorker.getWorkerID());
        }
        workerWithState.addWorkerState(JobMasterAPI.WorkerState.RESTARTING);
        this.workers.put(Integer.valueOf(registerWorker.getWorkerID()), workerWithState);
        sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), true, null);
        LOG.info("Worker[" + registerWorker.getWorkerID() + "] registered after failure.");
        if (this.dashClient != null) {
            this.dashClient.registerWorker(workerInfo);
        }
        if (z && allWorkersRegistered()) {
            LOG.info("All " + this.workers.size() + " workers joined the job.");
            sendListWorkersResponseToWaitList();
            sendWorkersJoinedMessage();
        } else {
            if (z || !allWorkersRegistered()) {
                return;
            }
            this.rrServer.sendMessage(constructWorkersJoinedMessage(), registerWorker.getWorkerID());
        }
    }

    private void recoverFromFailure(int i, JobMasterAPI.WorkerInfo workerInfo) {
        this.rrServer.removeWorkerChannel(i);
        this.rrServer.setWorkerChannel(i);
        broadcastRecoveryMessage(JobMasterAPI.Recover.newBuilder().setWorkerID(i).build());
    }

    private void stateChangeMessageReceived(RequestID requestID, JobMasterAPI.WorkerStateChange workerStateChange) {
        if (!this.workers.containsKey(Integer.valueOf(workerStateChange.getWorkerID()))) {
            LOG.warning("WorkerStateChange message received from a worker that has not joined the job yet.\nNot processing the message, just sending a response" + workerStateChange);
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            return;
        }
        if (workerStateChange.getState() == JobMasterAPI.WorkerState.RUNNING) {
            this.workers.get(Integer.valueOf(workerStateChange.getWorkerID())).addWorkerState(workerStateChange.getState());
            LOG.fine("WorkerStateChange RUNNING message received: \n" + workerStateChange);
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            if (this.dashClient != null) {
                this.dashClient.workerStateChange(workerStateChange.getWorkerID(), workerStateChange.getState());
            }
            if (haveAllWorkersBecomeRunning()) {
                this.jobMaster.allWorkersBecameRunning();
                return;
            }
            return;
        }
        if (workerStateChange.getState() != JobMasterAPI.WorkerState.COMPLETED) {
            LOG.warning("Unrecognized WorkerStateChange message received. Ignoring and sending reply: \n" + workerStateChange);
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            return;
        }
        this.workers.get(Integer.valueOf(workerStateChange.getWorkerID())).addWorkerState(workerStateChange.getState());
        LOG.fine("WorkerStateChange COMPLETED message received: \n" + workerStateChange);
        sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
        if (this.dashClient != null) {
            this.dashClient.workerStateChange(workerStateChange.getWorkerID(), workerStateChange.getState());
        }
        if (haveAllWorkersCompleted()) {
            this.jobMaster.completeJob();
        }
    }

    public void workersScaledDown(int i) {
        this.numberOfWorkers -= i;
        Message build = JobMasterAPI.WorkersScaled.newBuilder().setChange(0 - i).setNumberOfWorkers(this.numberOfWorkers).build();
        this.nextWorkerID -= i;
        LinkedList linkedList = new LinkedList();
        String str = "Deleted worker IDs by scaling down: ";
        for (int i2 = 0; i2 < i; i2++) {
            int i3 = this.numberOfWorkers + i2;
            linkedList.add(Integer.valueOf(i3));
            this.workers.remove(Integer.valueOf(i3));
            this.rrServer.removeWorkerChannel(i3);
            str = str + i3 + ", ";
        }
        LOG.info(str);
        Iterator<Integer> it = this.workers.keySet().iterator();
        while (it.hasNext()) {
            this.rrServer.sendMessage(build, it.next().intValue());
        }
        if (this.dashClient != null) {
            this.dashClient.scaledWorkers(build.getChange(), build.getNumberOfWorkers(), linkedList);
        }
    }

    public void workersScaledUp(int i) {
        int i2 = this.numberOfWorkers;
        this.numberOfWorkers += i;
        Message build = JobMasterAPI.WorkersScaled.newBuilder().setChange(i).setNumberOfWorkers(this.numberOfWorkers).build();
        for (int i3 = 0; i3 < i2; i3++) {
            this.rrServer.sendMessage(build, i3);
        }
        if (allWorkersRegistered()) {
            sendWorkersJoinedMessage();
        }
        if (this.dashClient != null) {
            this.dashClient.scaledWorkers(build.getChange(), build.getNumberOfWorkers(), new LinkedList());
        }
    }

    public boolean broadcastRecoveryMessage(Message message) {
        if (!allWorkersRegistered()) {
            LOG.warning("Could not send the broadcast message to all workers, since they are not all registered.");
            return false;
        }
        Iterator<Integer> it = this.workers.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.rrServer.sendMessage(message, intValue)) {
                LOG.warning("Broadcast message can not be sent to workerID: " + intValue);
                return false;
            }
        }
        return true;
    }

    public boolean broadcastMessage(Message message) {
        Message build = JobMasterAPI.DriverMessage.newBuilder().setData(Any.pack(message).toByteString()).build();
        if (!allWorkersRegistered()) {
            LOG.warning("Could not send the broadcast message to all workers, since they are not all registered.");
            return false;
        }
        Iterator<Integer> it = this.workers.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.rrServer.sendMessage(build, intValue)) {
                LOG.warning("Broadcast message can not be sent to workerID: " + intValue);
                return false;
            }
        }
        return true;
    }

    public boolean sendMessageToWorkerList(Message message, List<Integer> list) {
        Message build = JobMasterAPI.DriverMessage.newBuilder().setData(Any.pack(message).toByteString()).build();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            WorkerWithState workerWithState = this.workers.get(Integer.valueOf(intValue));
            if (workerWithState == null) {
                LOG.warning("There is no worker in JobMaster with workerID: " + intValue);
                return false;
            }
            if (workerWithState.getLastState() != JobMasterAPI.WorkerState.RUNNING && workerWithState.getLastState() != JobMasterAPI.WorkerState.STARTING) {
                LOG.warning("workerID[" + intValue + "] is neither in RUNNING nor in STARTING scate. Worker state: " + workerWithState.getLastState());
                return false;
            }
        }
        Iterator<Integer> it2 = list.iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            if (!this.rrServer.sendMessage(build, intValue2)) {
                LOG.warning("Message can not be sent to workerID: " + intValue2 + " It is not sending the message to remaining workers in the list.");
                return false;
            }
        }
        return true;
    }

    private void workerMessageReceived(RequestID requestID, JobMasterAPI.WorkerMessage workerMessage) {
        if (this.driver != null) {
            try {
                this.driver.workerMessageReceived(Any.parseFrom(workerMessage.getData()), workerMessage.getWorkerID());
            } catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Can not parse received protocol buffer message to Any", e);
                Message build = JobMasterAPI.WorkerMessageResponse.newBuilder().setSucceeded(false).setReason("Can not parse received protocol buffer message to Any").build();
                this.rrServer.sendResponse(requestID, build);
                LOG.warning("WorkerMessageResponse sent to the driver: \n" + build);
                return;
            }
        }
        Message build2 = JobMasterAPI.WorkerMessageResponse.newBuilder().setSucceeded(true).build();
        this.rrServer.sendResponse(requestID, build2);
        LOG.fine("WorkerMessageResponse sent to the driver: \n" + build2);
    }

    private boolean allWorkersRegistered() {
        return this.workers.size() == this.numberOfWorkers && this.workers.lastKey().intValue() == this.numberOfWorkers - 1;
    }

    private boolean allWorkersRunning() {
        if (!allWorkersRegistered()) {
            return false;
        }
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            if (it.next().getLastState() != JobMasterAPI.WorkerState.RUNNING) {
                return false;
            }
        }
        return true;
    }

    private boolean haveAllWorkersBecomeRunning() {
        if (this.numberOfWorkers != this.workers.size()) {
            return false;
        }
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            if (!it.next().hasWorkerBecomeRunning()) {
                return false;
            }
        }
        return true;
    }

    private boolean haveAllWorkersCompleted() {
        if (this.numberOfWorkers != this.workers.size()) {
            return false;
        }
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            if (!it.next().hasWorkerCompleted()) {
                return false;
            }
        }
        return true;
    }

    private void sendRegisterWorkerResponse(RequestID requestID, int i, boolean z, String str) {
        Message build = JobMasterAPI.RegisterWorkerResponse.newBuilder().setWorkerID(i).setResult(z).setReason(str == null ? "" : str).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("RegisterWorkerResponse sent:\n" + build);
    }

    private void sendWorkerStateChangeResponse(RequestID requestID, int i, JobMasterAPI.WorkerState workerState) {
        Message build = JobMasterAPI.WorkerStateChangeResponse.newBuilder().setWorkerID(i).setState(workerState).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("WorkerStateChangeResponse sent:\n" + build);
    }

    private void listWorkersMessageReceived(RequestID requestID, JobMasterAPI.ListWorkersRequest listWorkersRequest) {
        if (listWorkersRequest.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.IMMEDIATE_RESPONSE) {
            sendListWorkersResponse(listWorkersRequest.getWorkerID(), requestID);
            LOG.log(Level.FINE, String.format("Expecting %d workers, %d joined", Integer.valueOf(this.numberOfWorkers), Integer.valueOf(this.workers.size())));
        } else if (listWorkersRequest.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.RESPONSE_AFTER_ALL_JOINED) {
            if (this.workers.size() == this.numberOfWorkers) {
                sendListWorkersResponse(listWorkersRequest.getWorkerID(), requestID);
            } else {
                this.waitList.put(Integer.valueOf(listWorkersRequest.getWorkerID()), requestID);
            }
            LOG.log(Level.FINE, String.format("Expecting %d workers, %d joined", Integer.valueOf(this.numberOfWorkers), Integer.valueOf(this.workers.size())));
        }
    }

    private void sendListWorkersResponse(int i, RequestID requestID) {
        JobMasterAPI.ListWorkersResponse.Builder workerID = JobMasterAPI.ListWorkersResponse.newBuilder().setWorkerID(i);
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            workerID.addWorker(it.next().getWorkerInfo());
        }
        Message build = workerID.build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("ListWorkersResponse sent:\n" + build);
    }

    private void sendListWorkersResponseToWaitList() {
        for (Map.Entry<Integer, RequestID> entry : this.waitList.entrySet()) {
            sendListWorkersResponse(entry.getKey().intValue(), entry.getValue());
        }
        this.waitList.clear();
    }

    private void sendWorkersJoinedMessage() {
        LOG.info("Sending WorkersJoined messages ...");
        Message constructWorkersJoinedMessage = constructWorkersJoinedMessage();
        if (this.driver != null) {
            this.driver.allWorkersJoined(constructWorkerList());
        }
        Iterator<Integer> it = this.workers.keySet().iterator();
        while (it.hasNext()) {
            this.rrServer.sendMessage(constructWorkersJoinedMessage, it.next().intValue());
        }
    }

    private JobMasterAPI.WorkersJoined constructWorkersJoinedMessage() {
        JobMasterAPI.WorkersJoined.Builder numberOfWorkers = JobMasterAPI.WorkersJoined.newBuilder().setNumberOfWorkers(this.numberOfWorkers);
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            numberOfWorkers.addWorker(it.next().getWorkerInfo());
        }
        return numberOfWorkers.build();
    }

    private List<JobMasterAPI.WorkerInfo> constructWorkerList() {
        LinkedList linkedList = new LinkedList();
        Iterator<WorkerWithState> it = this.workers.values().iterator();
        while (it.hasNext()) {
            linkedList.add(it.next().getWorkerInfo());
        }
        return linkedList;
    }
}
