package edu.iu.dsc.tws.rsched.schedulers.k8s.worker;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.IWorkerStatusUpdater;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.core.WorkerRuntime;
import edu.iu.dsc.tws.rsched.schedulers.k8s.K8sEnvVariables;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesConstants;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import edu.iu.dsc.tws.rsched.worker.WorkerManager;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/k8s/worker/K8sWorkerStarter.class */
public final class K8sWorkerStarter {
    private static JobMasterAPI.WorkerInfo workerInfo;
    private static final Logger LOG = Logger.getLogger(K8sWorkerStarter.class.getName());
    private static Config config = null;
    private static int workerID = -1;
    private static String jobID = null;
    private static JobAPI.Job job = null;
    private static JobAPI.ComputeResource computeResource = null;
    private static boolean properShutDown = false;

    private K8sWorkerStarter() {
    }

    public static void main(String[] strArr) {
        LoggingHelper.setLoggingFormat("[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        int parseInt = Integer.parseInt(System.getenv(K8sEnvVariables.WORKER_PORT + ""));
        String str = System.getenv(K8sEnvVariables.CONTAINER_NAME + "");
        String str2 = System.getenv(K8sEnvVariables.POD_NAME + "");
        String str3 = System.getenv(K8sEnvVariables.HOST_IP + "");
        String str4 = System.getenv(K8sEnvVariables.HOST_NAME + "");
        String str5 = System.getenv(K8sEnvVariables.JOB_MASTER_IP + "");
        String str6 = System.getenv(K8sEnvVariables.ENCODED_NODE_INFO_LIST + "");
        jobID = System.getenv(K8sEnvVariables.JOB_ID + "");
        if (jobID == null) {
            throw new RuntimeException("JobID is null");
        }
        config = K8sWorkerUtils.loadConfig("/twister2-memory-dir/twister2-job");
        String str7 = "/twister2-memory-dir/twister2-job/" + SchedulerContext.createJobDescriptionFileName(jobID);
        job = JobUtils.readJobFile(null, str7);
        LOG.info("Job description file is loaded: " + str7);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        if (!job.getDriverClassName().isEmpty() || !ZKContext.isZooKeeperServerUsed(config)) {
            updateJobMasterIp(str5);
        }
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            JobMasterAPI.NodeInfo nodeInfo = KubernetesContext.nodeLocationsFromConfig(config) ? KubernetesContext.getNodeInfo(config, str3) : K8sWorkerUtils.getNodeInfoFromEncodedStr(str6, str3);
            LOG.info("PodName: " + str2 + ", NodeInfo: " + nodeInfo);
            workerID = K8sWorkerUtils.calculateWorkerID(job, str2, str);
            computeResource = K8sWorkerUtils.getComputeResource(job, str2);
            workerInfo = WorkerInfoUtils.createWorkerInfo(workerID, hostAddress, parseInt, nodeInfo, computeResource, K8sWorkerUtils.generateAdditionalPorts(config, parseInt));
            K8sPersistentVolume k8sPersistentVolume = null;
            if (KubernetesContext.persistentVolumeRequested(config)) {
                k8sPersistentVolume = new K8sPersistentVolume(KubernetesConstants.PERSISTENT_VOLUME_MOUNT, workerID);
            }
            K8sWorkerUtils.initWorkerLogger(workerID, k8sPersistentVolume, config);
            LOG.info("Worker information summary: \nworkerID: " + workerID + "\nPOD_IP: " + hostAddress + "\nHOSTNAME(podname): " + str2 + "\nworkerPort: " + parseInt + "\nhostName(nodeName): " + str4 + "\nhostIP(nodeIP): " + str3 + "\n");
            WorkerRuntime.init(config, job, workerInfo, K8sWorkerUtils.initialStateAndUpdate(config, jobID, workerInfo));
            IWorkerController workerController = WorkerRuntime.getWorkerController();
            IWorkerStatusUpdater workerStatusUpdater = WorkerRuntime.getWorkerStatusUpdater();
            addShutdownHook();
            startWorker(workerController, k8sPersistentVolume);
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.COMPLETED);
            WorkerRuntime.close();
            properShutDown = true;
            K8sWorkerUtils.waitIndefinitely();
        } catch (UnknownHostException e) {
            throw new RuntimeException("Cannot get localHost.", e);
        }
    }

    public static String updateJobMasterIp(String str) {
        if (!JobMasterContext.jobMasterRunsInClient(config)) {
            str = K8sWorkerUtils.getJobMasterServiceIP(KubernetesContext.namespace(config), jobID);
            if (str == null) {
                str = PodWatchUtils.getJobMasterIpByWatchingPodToRunning(KubernetesContext.namespace(config), jobID, 100);
            }
            if (str == null) {
                throw new RuntimeException("Job master is running in a separate pod, but this worker can not get the job master IP address from Kubernetes master.\nJob master address: " + str);
            }
            LOG.info("Job master address: " + str);
        } else if (str == null || str.trim().length() == 0) {
            throw new RuntimeException("Job master running in the client, but this worker got job master IP as empty from environment variables.");
        }
        config = Config.newBuilder().putAll(config).put("twister2.job.master.ip", str).build();
        return str;
    }

    public static void startWorker(IWorkerController iWorkerController, IPersistentVolume iPersistentVolume) {
        String workerClassName = job.getWorkerClassName();
        try {
            IWorker iWorker = (IWorker) ReflectionUtils.newInstance(workerClassName);
            LOG.info("loaded worker class: " + workerClassName);
            K8sVolatileVolume k8sVolatileVolume = null;
            if (computeResource.getDiskGigaBytes() > 0.0d) {
                k8sVolatileVolume = new K8sVolatileVolume(jobID, workerID);
            }
            new WorkerManager(config, workerID, iWorkerController, iPersistentVolume, k8sVolatileVolume, iWorker).start();
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("failed to load the worker class %s", workerClassName));
            throw new RuntimeException(e);
        }
    }

    public static void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerStarter.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (K8sWorkerStarter.properShutDown) {
                    return;
                }
                WorkerRuntime.close();
            }
        });
    }
}
