package edu.iu.dsc.tws.master.server;

import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.common.net.tcp.request.RRServer;
import edu.iu.dsc.tws.common.zk.WorkerWithState;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/master/server/WorkerHandler.class */
public class WorkerHandler implements MessageHandler {
    private static final Logger LOG = Logger.getLogger(WorkerHandler.class.getName());
    private WorkerMonitor workerMonitor;
    private RRServer rrServer;
    private boolean zkUsed;
    private boolean allConnected = false;
    private List<RequestID> waitList = new LinkedList();

    public WorkerHandler(WorkerMonitor workerMonitor, RRServer rRServer, boolean z) {
        this.workerMonitor = workerMonitor;
        this.rrServer = rRServer;
        this.zkUsed = z;
    }

    public boolean isAllConnected() {
        return this.allConnected;
    }

    public void onMessage(RequestID requestID, int i, Message message) {
        if (message instanceof JobMasterAPI.RegisterWorker) {
            registerWorkerMessageReceived(requestID, (JobMasterAPI.RegisterWorker) message);
            return;
        }
        if (message instanceof JobMasterAPI.WorkerStateChange) {
            stateChangeMessageReceived(requestID, (JobMasterAPI.WorkerStateChange) message);
        } else if (!(message instanceof JobMasterAPI.ListWorkersRequest)) {
            LOG.log(Level.SEVERE, "Un-known message type received: " + message);
        } else {
            LOG.log(Level.FINE, "ListWorkersRequest received: " + message.toString());
            listWorkersMessageReceived(requestID, (JobMasterAPI.ListWorkersRequest) message);
        }
    }

    private void registerWorkerMessageReceived(RequestID requestID, JobMasterAPI.RegisterWorker registerWorker) {
        handleAllConnected();
        if (this.zkUsed) {
            int workerID = registerWorker.getWorkerInfo().getWorkerID();
            LOG.fine("Since ZooKeeper is used, ignoring RegisterWorker message for worker: " + workerID);
            sendRegisterWorkerResponse(requestID, workerID, true, null);
            this.workerMonitor.informDriverForAllJoined();
            return;
        }
        LOG.fine("RegisterWorker message received: \n" + registerWorker);
        JobMasterAPI.WorkerInfo workerInfo = registerWorker.getWorkerInfo();
        boolean isAllJoined = this.workerMonitor.isAllJoined();
        WorkerWithState workerWithState = new WorkerWithState(workerInfo, registerWorker.getInitialState());
        if (registerWorker.getInitialState() == JobMasterAPI.WorkerState.RESTARTED) {
            String restarted = this.workerMonitor.restarted(workerWithState);
            if (restarted != null) {
                sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), false, restarted);
                return;
            }
        } else {
            String started = this.workerMonitor.started(workerWithState);
            if (started != null) {
                sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), false, started);
                return;
            }
        }
        sendRegisterWorkerResponse(requestID, workerInfo.getWorkerID(), true, null);
        if (isAllJoined || !this.workerMonitor.isAllJoined()) {
            return;
        }
        LOG.info("All workers joined the job. Worker IDs: " + this.workerMonitor.getWorkerIDs());
        sendListWorkersResponseToWaitList();
        sendWorkersJoinedMessage();
    }

    private void stateChangeMessageReceived(RequestID requestID, JobMasterAPI.WorkerStateChange workerStateChange) {
        if (!this.workerMonitor.existWorker(workerStateChange.getWorkerID())) {
            LOG.warning("WorkerStateChange message received from a worker that has not joined the job yet.\nNot processing the message, just sending a response" + workerStateChange);
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            return;
        }
        if (workerStateChange.getState() == JobMasterAPI.WorkerState.COMPLETED) {
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            this.workerMonitor.completed(workerStateChange.getWorkerID());
        } else if (workerStateChange.getState() != JobMasterAPI.WorkerState.FAILED) {
            LOG.warning("Unrecognized WorkerStateChange message received. Ignoring and sending reply: \n" + workerStateChange);
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
        } else {
            LOG.warning("Worker [" + workerStateChange.getWorkerID() + "] Failed. ");
            sendWorkerStateChangeResponse(requestID, workerStateChange.getWorkerID(), workerStateChange.getState());
            this.workerMonitor.failed(workerStateChange.getWorkerID());
        }
    }

    private void listWorkersMessageReceived(RequestID requestID, JobMasterAPI.ListWorkersRequest listWorkersRequest) {
        if (listWorkersRequest.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.IMMEDIATE_RESPONSE) {
            sendListWorkersResponse(requestID);
            LOG.fine(String.format("Expecting %d workers, %d joined", Integer.valueOf(this.workerMonitor.getNumberOfWorkers()), Integer.valueOf(this.workerMonitor.getWorkersListSize())));
        } else if (listWorkersRequest.getRequestType() == JobMasterAPI.ListWorkersRequest.RequestType.RESPONSE_AFTER_ALL_JOINED) {
            if (this.workerMonitor.getWorkersListSize() == this.workerMonitor.getNumberOfWorkers()) {
                sendListWorkersResponse(requestID);
            } else {
                this.waitList.add(requestID);
            }
            LOG.log(Level.FINE, String.format("Expecting %d workers, %d joined", Integer.valueOf(this.workerMonitor.getNumberOfWorkers()), Integer.valueOf(this.workerMonitor.getWorkersListSize())));
        }
    }

    public void workersScaledDown(int i) {
        JobMasterAPI.WorkersScaled build = JobMasterAPI.WorkersScaled.newBuilder().setChange(0 - i).setNumberOfWorkers(this.workerMonitor.getNumberOfWorkers()).build();
        Iterator<Integer> it = this.workerMonitor.getWorkerIDs().iterator();
        while (it.hasNext()) {
            this.rrServer.sendMessage(build, it.next().intValue());
        }
    }

    public void workersScaledUp(int i) {
        unsetAllConnected();
        if (this.zkUsed) {
            return;
        }
        JobMasterAPI.WorkersScaled build = JobMasterAPI.WorkersScaled.newBuilder().setChange(i).setNumberOfWorkers(this.workerMonitor.getNumberOfWorkers()).build();
        int numberOfWorkers = this.workerMonitor.getNumberOfWorkers() - i;
        for (int i2 = 0; i2 < numberOfWorkers; i2++) {
            this.rrServer.sendMessage(build, i2);
        }
    }

    private void sendListWorkersResponse(RequestID requestID) {
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.ListWorkersResponse build = JobMasterAPI.ListWorkersResponse.newBuilder().setNumberOfWorkers(workerInfoList.size()).addAllWorker(workerInfoList).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("ListWorkersResponse sent:\n" + build);
    }

    private void sendListWorkersResponseToWaitList() {
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.ListWorkersResponse build = JobMasterAPI.ListWorkersResponse.newBuilder().setNumberOfWorkers(workerInfoList.size()).addAllWorker(workerInfoList).build();
        Iterator<RequestID> it = this.waitList.iterator();
        while (it.hasNext()) {
            this.rrServer.sendResponse(it.next(), build);
        }
        this.waitList.clear();
    }

    private void sendRegisterWorkerResponse(RequestID requestID, int i, boolean z, String str) {
        JobMasterAPI.RegisterWorkerResponse build = JobMasterAPI.RegisterWorkerResponse.newBuilder().setWorkerID(i).setResult(z).setReason(str == null ? "" : str).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("RegisterWorkerResponse sent:\n" + build);
    }

    private void sendWorkerStateChangeResponse(RequestID requestID, int i, JobMasterAPI.WorkerState workerState) {
        JobMasterAPI.WorkerStateChangeResponse build = JobMasterAPI.WorkerStateChangeResponse.newBuilder().setWorkerID(i).setState(workerState).build();
        this.rrServer.sendResponse(requestID, build);
        LOG.fine("WorkerStateChangeResponse sent:\n" + build);
    }

    public void sendWorkersJoinedMessage() {
        LOG.info("Sending WorkersJoined messages ...");
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.WorkersJoined build = JobMasterAPI.WorkersJoined.newBuilder().setNumberOfWorkers(workerInfoList.size()).addAllWorker(workerInfoList).build();
        Iterator<JobMasterAPI.WorkerInfo> it = workerInfoList.iterator();
        while (it.hasNext()) {
            this.rrServer.sendMessage(build, it.next().getWorkerID());
        }
    }

    private boolean allWorkersConnected() {
        int numberOfWorkers = this.workerMonitor.getNumberOfWorkers();
        Set connectedWorkers = this.rrServer.getConnectedWorkers();
        return connectedWorkers.size() == numberOfWorkers && ((Integer) Collections.max(connectedWorkers)).intValue() == numberOfWorkers - 1;
    }

    private void handleAllConnected() {
        if (this.allConnected || !allWorkersConnected()) {
            return;
        }
        this.allConnected = true;
    }

    public void unsetAllConnected() {
        if (allWorkersConnected()) {
            return;
        }
        this.allConnected = false;
    }
}
