package edu.iu.dsc.tws.rsched.schedulers.k8s.mpi;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesConstants;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sPersistentVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sVolatileVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import mpi.MPI;
import mpi.MPIException;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/k8s/mpi/MPIWorkerStarter.class */
public final class MPIWorkerStarter {
    private static JobMasterAPI.WorkerInfo workerInfo;
    private static JMWorkerAgent jobMasterAgent;
    private static final Logger LOG = Logger.getLogger(MPIWorkerStarter.class.getName());
    private static Config config = null;
    private static int workerID = -1;
    private static int numberOfWorkers = -1;
    private static String jobName = null;
    private static JobAPI.Job job = null;
    private static JobAPI.ComputeResource computeResource = null;

    private MPIWorkerStarter() {
    }

    public static void main(String[] strArr) {
        JobMasterAPI.NodeInfo nodeInfo;
        LoggingHelper.setLoggingFormat("[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        String jobMasterIPCommandLineArgumentValue = MPIMasterStarter.getJobMasterIPCommandLineArgumentValue(strArr[0]);
        jobName = strArr[1];
        String str = strArr[2];
        if (jobMasterIPCommandLineArgumentValue == null) {
            throw new RuntimeException("JobMasterIP address is null");
        }
        if (jobName == null) {
            throw new RuntimeException("jobName is null");
        }
        String replaceAll = str.replaceAll("'", "");
        config = K8sWorkerUtils.loadConfig("/twister2-memory-dir/twister2-job");
        try {
            MPI.Init(strArr);
            workerID = MPI.COMM_WORLD.getRank();
            numberOfWorkers = MPI.COMM_WORLD.getSize();
            K8sPersistentVolume k8sPersistentVolume = null;
            if (KubernetesContext.persistentVolumeRequested(config)) {
                k8sPersistentVolume = new K8sPersistentVolume(KubernetesConstants.PERSISTENT_VOLUME_MOUNT, workerID);
            }
            K8sWorkerUtils.initWorkerLogger(workerID, k8sPersistentVolume, config);
            String str2 = "/twister2-memory-dir/twister2-job/" + SchedulerContext.createJobDescriptionFileName(jobName);
            job = JobUtils.readJobFile(null, str2);
            LOG.info("Job description file is loaded: " + str2);
            config = JobUtils.overrideConfigs(job, config);
            config = JobUtils.updateConfigs(job, config);
            config = K8sWorkerUtils.unsetWorkerIDAssigment(config);
            InetAddress inetAddress = null;
            try {
                inetAddress = InetAddress.getLocalHost();
            } catch (UnknownHostException e) {
                LOG.log(Level.SEVERE, "Cannot get localHost.", (Throwable) e);
            }
            String hostAddress = inetAddress.getHostAddress();
            String hostName = inetAddress.getHostName();
            int workerBasePort = KubernetesContext.workerBasePort(config) + (workerID * (SchedulerContext.numberOfAdditionalPorts(config) + 1));
            String nodeIP = PodWatchUtils.getNodeIP(KubernetesContext.namespace(config), jobName, hostAddress);
            if (nodeIP == null) {
                LOG.warning("Could not get nodeIP for this pod. Using podIP as nodeIP.");
                nodeInfo = NodeInfoUtils.createNodeInfo(hostAddress, (String) null, (String) null);
            } else {
                nodeInfo = KubernetesContext.nodeLocationsFromConfig(config) ? KubernetesContext.getNodeInfo(config, nodeIP) : K8sWorkerUtils.getNodeInfoFromEncodedStr(replaceAll, nodeIP);
            }
            LOG.info("NodeInfoUtils for this worker: " + nodeInfo);
            computeResource = K8sWorkerUtils.getComputeResource(job, hostName);
            workerInfo = WorkerInfoUtils.createWorkerInfo(workerID, hostAddress, workerBasePort, nodeInfo, computeResource, K8sWorkerUtils.generateAdditionalPorts(config, workerBasePort));
            LOG.info("Worker information summary: \nMPI Rank(workerID): " + workerID + "\nMPI Size(number of workers): " + numberOfWorkers + "\nPOD_IP: " + hostAddress + "\nHOSTNAME(podname): " + hostName);
            jobMasterAgent = JMWorkerAgent.createJMWorkerAgent(config, workerInfo, jobMasterIPCommandLineArgumentValue, JobMasterContext.jobMasterPort(config), job.getNumberOfWorkers());
            jobMasterAgent.startThreaded(false);
            jobMasterAgent.sendWorkerRunningMessage();
            startWorker(jobMasterAgent, k8sPersistentVolume, hostName);
            try {
                MPI.Finalize();
            } catch (MPIException e2) {
            }
            closeWorker();
        } catch (MPIException e3) {
            LOG.log(Level.SEVERE, "Could not get rank or size from MPI.COMM_WORLD", e3);
            throw new RuntimeException(e3);
        }
    }

    public static void startWorker(JMWorkerAgent jMWorkerAgent, IPersistentVolume iPersistentVolume, String str) {
        String workerClass = SchedulerContext.workerClass(config);
        try {
            IWorker iWorker = (IWorker) ReflectionUtils.newInstance(workerClass);
            LOG.info("loaded worker class: " + workerClass);
            K8sVolatileVolume k8sVolatileVolume = null;
            if (computeResource.getDiskGigaBytes() > 0.0d) {
                k8sVolatileVolume = new K8sVolatileVolume(jobName, workerID);
            }
            iWorker.execute(config, workerID, jMWorkerAgent.getJMWorkerController(), iPersistentVolume, k8sVolatileVolume);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("failed to load the worker class %s", workerClass));
            throw new RuntimeException(e);
        }
    }

    public static void closeWorker() {
        jobMasterAgent.sendWorkerCompletedMessage();
        jobMasterAgent.close();
    }
}
