package edu.iu.dsc.tws.master.worker;

import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.net.BlockingSendException;
import edu.iu.dsc.tws.api.net.StatusCode;
import edu.iu.dsc.tws.api.net.request.ConnectHandler;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.api.resource.IAllJoinedListener;
import edu.iu.dsc.tws.api.resource.IReceiverFromDriver;
import edu.iu.dsc.tws.api.resource.IScalerListener;
import edu.iu.dsc.tws.api.resource.IWorkerFailureListener;
import edu.iu.dsc.tws.checkpointing.client.CheckpointingClientImpl;
import edu.iu.dsc.tws.common.net.tcp.Progress;
import edu.iu.dsc.tws.common.net.tcp.request.RRClient;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/master/worker/JMWorkerAgent.class */
public final class JMWorkerAgent {
    private static final Logger LOG = Logger.getLogger(JMWorkerAgent.class.getName());
    private static Progress looper;
    private Config config;
    private JobMasterAPI.WorkerInfo thisWorker;
    private String jmAddress;
    private int jmPort;
    private RRClient rrClient;
    private JMWorkerController workerController;
    private JMDriverAgent driverAgent;
    private JMWorkerStatusUpdater statusUpdater;
    private boolean registrationSucceeded;
    private int numberOfWorkers;
    private IScalerListener scalerListener;
    private IAllJoinedListener allJoinedListener;
    private static JMWorkerAgent workerAgent;
    private static final long CONNECTION_TRY_TIME_LIMIT = 100000;
    private CheckpointingClientImpl checkpointClient;
    private int restartCount;
    private JobMasterAPI.WorkerState initialState;
    private boolean stopLooper = false;
    private boolean disconnected = false;
    private boolean reconnect = false;
    private boolean reconnected = false;
    private LinkedList<JobMasterAPI.JobScaled> scaledEventBuffer = new LinkedList<>();
    private LinkedList<JobMasterAPI.AllJoined> allJoinedEventBuffer = new LinkedList<>();

    /* loaded from: input_file:edu/iu/dsc/tws/master/worker/JMWorkerAgent$ClientConnectHandler.class */
    public class ClientConnectHandler implements ConnectHandler {
        public ClientConnectHandler() {
        }

        public void onError(SocketChannel socketChannel, StatusCode statusCode) {
            JMWorkerAgent.this.disconnected = true;
        }

        public void onConnect(SocketChannel socketChannel) {
            JMWorkerAgent.this.disconnected = false;
            JMWorkerAgent.LOG.info("Worker " + JMWorkerAgent.this.thisWorker.getWorkerID() + " connected to JobMaster: " + socketChannel);
        }

        public void onClose(SocketChannel socketChannel) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:edu/iu/dsc/tws/master/worker/JMWorkerAgent$ResponseMessageHandler.class */
    public class ResponseMessageHandler implements MessageHandler {
        ResponseMessageHandler() {
        }

        public void onMessage(RequestID requestID, int i, Message message) {
            if (message instanceof JobMasterAPI.RegisterWorkerResponse) {
                JMWorkerAgent.LOG.fine("Received a RegisterWorkerResponse message from the master. \n" + message);
                JMWorkerAgent.this.registrationSucceeded = ((JobMasterAPI.RegisterWorkerResponse) message).getResult();
                return;
            }
            if (message instanceof JobMasterAPI.JobScaled) {
                JMWorkerAgent.LOG.fine("Received " + message.getClass().getSimpleName() + " message from the master. \n" + message);
                JobMasterAPI.JobScaled jobScaled = (JobMasterAPI.JobScaled) message;
                if (JMWorkerAgent.this.scalerListener == null) {
                    JMWorkerAgent.this.scaledEventBuffer.add(jobScaled);
                } else {
                    JMWorkerAgent.this.deliverToScalerListener(jobScaled);
                }
                JMWorkerAgent.this.workerController.scaled(jobScaled.getChange(), jobScaled.getNumberOfWorkers());
                return;
            }
            if (!(message instanceof JobMasterAPI.AllJoined)) {
                JMWorkerAgent.LOG.warning("Received message unrecognized. \n" + message);
                return;
            }
            JobMasterAPI.AllJoined allJoined = (JobMasterAPI.AllJoined) message;
            if (JMWorkerAgent.this.allJoinedListener == null) {
                JMWorkerAgent.this.allJoinedEventBuffer.add(allJoined);
            } else {
                JMWorkerAgent.this.deliverToAllJoinedListener(allJoined);
            }
        }
    }

    private JMWorkerAgent(Config config, JobMasterAPI.WorkerInfo workerInfo, String str, int i, int i2, int i3) {
        this.config = config;
        this.thisWorker = workerInfo;
        this.jmAddress = str;
        this.jmPort = i;
        this.numberOfWorkers = i2;
        this.restartCount = i3;
        this.initialState = i3 > 0 ? JobMasterAPI.WorkerState.RESTARTED : JobMasterAPI.WorkerState.STARTED;
    }

    public static JMWorkerAgent createJMWorkerAgent(Config config, JobMasterAPI.WorkerInfo workerInfo, String str, int i, int i2, int i3) {
        if (workerAgent != null) {
            return workerAgent;
        }
        workerAgent = new JMWorkerAgent(config, workerInfo, str, i, i2, i3);
        return workerAgent;
    }

    public static JMWorkerAgent getJMWorkerAgent() {
        return workerAgent;
    }

    private void init() {
        looper = new Progress();
        this.rrClient = new RRClient(this.jmAddress, this.jmPort, (Config) null, looper, this.thisWorker.getWorkerID(), new ClientConnectHandler());
        this.driverAgent = new JMDriverAgent(this.rrClient, this.thisWorker.getWorkerID());
        this.statusUpdater = new JMWorkerStatusUpdater(this.rrClient, this.thisWorker.getWorkerID(), this.config);
        ResponseMessageHandler responseMessageHandler = new ResponseMessageHandler();
        this.rrClient.registerResponseHandler(JobMasterAPI.RegisterWorker.newBuilder(), responseMessageHandler);
        this.rrClient.registerResponseHandler(JobMasterAPI.RegisterWorkerResponse.newBuilder(), responseMessageHandler);
        this.rrClient.registerResponseHandler(JobMasterAPI.JobScaled.newBuilder(), responseMessageHandler);
        this.rrClient.registerResponseHandler(JobMasterAPI.AllJoined.newBuilder(), responseMessageHandler);
        this.checkpointClient = new CheckpointingClientImpl(this.rrClient, this.config.getLongValue("twister2.checkpointing.request.timeout", 10000L).longValue());
        this.workerController = new JMWorkerController(this.config, this.thisWorker, this.numberOfWorkers, this.restartCount, this.rrClient, this.checkpointClient);
        tryUntilConnected(CONNECTION_TRY_TIME_LIMIT);
        if (!this.rrClient.isConnected()) {
            throw new RuntimeException("JMWorkerAgent can not connect to Job Master. Exiting .....");
        }
        this.checkpointClient.init();
    }

    private void startLooping() {
        while (!this.stopLooper) {
            looper.loopBlocking();
            if (this.reconnect) {
                LOG.fine("Worker is disconnecting from JobMaster from previous session.");
                this.rrClient.disconnect();
                this.rrClient.setHostAndPort(this.jmAddress, this.jmPort);
                this.reconnected = tryUntilConnected(CONNECTION_TRY_TIME_LIMIT);
                if (this.reconnected) {
                    LOG.info("Worker " + this.thisWorker.getWorkerID() + " Re-connected to JobMaster.");
                } else {
                    LOG.info("Worker " + this.thisWorker.getWorkerID() + " could not re-connect to JobMaster.");
                }
                this.reconnect = false;
            }
        }
        this.rrClient.disconnect();
    }

    public Thread startThreaded() {
        init();
        Thread thread = new Thread(this::startLooping);
        thread.setName("JM-Agent-WorkerID " + this.thisWorker.getWorkerID());
        thread.setDaemon(true);
        thread.start();
        boolean registerWorker = registerWorker();
        if (!registerWorker && this.disconnected) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
            registerWorker = reconnect(this.jmAddress);
        }
        if (!registerWorker && this.disconnected) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e2) {
            }
            registerWorker = reconnect(this.jmAddress);
        }
        if (registerWorker) {
            return thread;
        }
        close();
        throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
    }

    public void startBlocking() {
        init();
        startLooping();
        if (registerWorker()) {
            return;
        }
        close();
        throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
    }

    public boolean tryUntilConnected(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = 0;
        long j3 = 100;
        long j4 = 1000;
        while (j2 < j && !this.rrClient.isConnected()) {
            this.rrClient.tryConnecting();
            try {
                looper.loop();
            } catch (CancelledKeyException e) {
                LOG.severe(e.getMessage() + " Will try to reconnect ...");
                this.rrClient.disconnect();
            }
            if (this.rrClient.isConnected()) {
                return true;
            }
            try {
                Thread.sleep(j3);
            } catch (InterruptedException e2) {
                LOG.warning("Sleep interrupted.");
            }
            if (j3 < 1000) {
                j3 = Math.min(j3 * 2, 1000L);
            }
            j2 = System.currentTimeMillis() - currentTimeMillis;
            if (j2 > j4) {
                LOG.info("Still trying to connect to the Job Master: " + this.jmAddress + ":" + this.jmPort);
                j4 += 1000;
            }
        }
        return false;
    }

    public boolean reconnect(String str) {
        this.jmAddress = str;
        this.reconnect = true;
        this.reconnected = false;
        looper.wakeup();
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        while (true) {
            long j2 = j;
            if (this.reconnected || j2 >= CONNECTION_TRY_TIME_LIMIT) {
                break;
            }
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                LOG.warning("Sleep interrupted. Will try again.");
            }
            j = System.currentTimeMillis() - currentTimeMillis;
        }
        if (!this.reconnected) {
            throw new RuntimeException("Could not reconnect Worker with JobMaster. Exiting .....");
        }
        LOG.info("Worker re-registering with JobMaster to initialize things.");
        return registerWorker();
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.thisWorker;
    }

    public JMWorkerController getJMWorkerController() {
        return this.workerController;
    }

    public JMDriverAgent getDriverAgent() {
        return this.driverAgent;
    }

    public JMWorkerStatusUpdater getStatusUpdater() {
        return this.statusUpdater;
    }

    public CheckpointingClientImpl getCheckpointClient() {
        return this.checkpointClient;
    }

    public static boolean addScalerListener(IScalerListener iScalerListener) {
        if (workerAgent.scalerListener != null) {
            return false;
        }
        workerAgent.scalerListener = iScalerListener;
        workerAgent.deliverBufferedScaledEvents();
        return true;
    }

    public static boolean addAllJoinedListener(IAllJoinedListener iAllJoinedListener) {
        if (workerAgent.allJoinedListener != null) {
            return false;
        }
        workerAgent.allJoinedListener = iAllJoinedListener;
        workerAgent.deliverBufferedAllJoinedEvents();
        return true;
    }

    public static boolean addReceiverFromDriver(IReceiverFromDriver iReceiverFromDriver) {
        return workerAgent.getDriverAgent().addReceiverFromDriver(iReceiverFromDriver);
    }

    public static boolean addWorkerFailureListener(IWorkerFailureListener iWorkerFailureListener) {
        return workerAgent.getStatusUpdater().addWorkerFailureListener(iWorkerFailureListener);
    }

    private boolean registerWorker() {
        JobMasterAPI.RegisterWorker build = JobMasterAPI.RegisterWorker.newBuilder().setWorkerID(this.thisWorker.getWorkerID()).setWorkerInfo(this.thisWorker).setRestartCount(this.restartCount).build();
        LOG.fine("Sending RegisterWorker message: \n" + build);
        try {
            this.rrClient.sendRequestWaitResponse(build, JobMasterContext.responseWaitDuration(this.config));
            if (this.registrationSucceeded) {
                LOG.info("Registered worker[" + this.thisWorker.getWorkerID() + "] with JobMaster.");
            }
            return this.registrationSucceeded;
        } catch (BlockingSendException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            return false;
        }
    }

    public boolean sendWorkerCompletedMessage(JobMasterAPI.WorkerState workerState) {
        return this.statusUpdater.updateWorkerStatus(workerState);
    }

    public void close() {
        this.stopLooper = true;
        looper.wakeup();
    }

    private void deliverBufferedScaledEvents() {
        while (!this.scaledEventBuffer.isEmpty()) {
            deliverToScalerListener(this.scaledEventBuffer.poll());
        }
    }

    private void deliverBufferedAllJoinedEvents() {
        while (!this.allJoinedEventBuffer.isEmpty()) {
            deliverToAllJoinedListener(this.allJoinedEventBuffer.poll());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deliverToScalerListener(JobMasterAPI.JobScaled jobScaled) {
        if (jobScaled.getChange() > 0) {
            this.scalerListener.workersScaledUp(jobScaled.getChange());
        } else if (jobScaled.getChange() < 0) {
            this.scalerListener.workersScaledDown(0 - jobScaled.getChange());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deliverToAllJoinedListener(JobMasterAPI.AllJoined allJoined) {
        this.allJoinedListener.allWorkersJoined(allJoined.getWorkerInfoList());
    }
}
