package edu.iu.dsc.tws.rsched.schedulers.k8s.master;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.zk.JobZNodeManager;
import edu.iu.dsc.tws.common.zk.ZKBarrierManager;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEphemStateManager;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKPersStateManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.server.JobMaster;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.k8s.K8sEnvVariables;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesConstants;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.driver.K8sScaler;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/k8s/master/JobMasterStarter.class */
public final class JobMasterStarter {
    private static final Logger LOG = Logger.getLogger(JobMasterStarter.class.getName());
    public static JobAPI.Job job;

    private JobMasterStarter() {
    }

    public static void main(String[] strArr) {
        LoggingHelper.setLoggingFormat("[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        String str = System.getenv(K8sEnvVariables.JOB_ID.name());
        String str2 = System.getenv(K8sEnvVariables.ENCODED_NODE_INFO_LIST.name());
        String str3 = System.getenv(K8sEnvVariables.HOST_IP.name());
        boolean parseBoolean = Boolean.parseBoolean(System.getenv(K8sEnvVariables.RESTORE_JOB.name()));
        Config loadConfig = K8sWorkerUtils.loadConfig(KubernetesConstants.POD_MEMORY_VOLUME + File.separator + "twister2-job");
        String str4 = KubernetesConstants.POD_MEMORY_VOLUME + File.separator + "twister2-job" + File.separator + SchedulerContext.createJobDescriptionFileName(str);
        job = JobUtils.readJobFile(str4);
        LOG.info("Job description file is loaded: " + str4);
        Config build = Config.newBuilder().putAll(JobUtils.updateConfigs(job, JobUtils.overrideConfigs(job, loadConfig))).put("twister2.checkpointing.restore.job", Boolean.valueOf(parseBoolean)).build();
        K8sWorkerUtils.initLogger(build, "jobMaster", JobMasterContext.persistentVolumeRequested(build));
        LOG.info("JobMaster is starting. Current time: " + System.currentTimeMillis());
        LOG.info("Number of configuration parameters: " + build.size());
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            JobMasterAPI.NodeInfo nodeInfo = KubernetesContext.nodeLocationsFromConfig(build) ? KubernetesContext.getNodeInfo(build, str3) : K8sWorkerUtils.getNodeInfoFromEncodedStr(str2, str3);
            LOG.info("NodeInfo for JobMaster: " + nodeInfo);
            KubernetesController init = KubernetesController.init(KubernetesContext.namespace(build));
            JobTerminator jobTerminator = new JobTerminator(build, init);
            K8sScaler k8sScaler = new K8sScaler(build, job, init);
            int initRestartFromCM = K8sWorkerUtils.initRestartFromCM(init, str, KubernetesUtils.createRestartJobMasterKey());
            JobMasterAPI.JobMasterState jobMasterState = JobMasterAPI.JobMasterState.JM_STARTED;
            if (initRestartFromCM > 0) {
                jobMasterState = JobMasterAPI.JobMasterState.JM_RESTARTED;
                if (!ZKContext.isZooKeeperServerUsed(build)) {
                    jobTerminator.terminateJob(str, JobAPI.JobState.FAILED);
                    return;
                }
            }
            if (ZKContext.isZooKeeperServerUsed(build) && !initializeZooKeeper(build, str, hostAddress, jobMasterState)) {
                jobTerminator.terminateJob(str, JobAPI.JobState.FAILED);
                return;
            }
            JobMaster jobMaster = new JobMaster(build, hostAddress, jobTerminator, job, nodeInfo, k8sScaler, jobMasterState);
            JobKillWatcher jobKillWatcher = new JobKillWatcher(KubernetesContext.namespace(build), str, init, jobMaster);
            jobKillWatcher.start();
            Thread.setDefaultUncaughtExceptionHandler((thread, th) -> {
                LOG.log(Level.SEVERE, "Uncaught exception in the thread " + thread + ". Job Master FAILED...", th);
                jobMaster.jmFailed();
                jobKillWatcher.close();
                KubernetesController.close();
                throw new RuntimeException("Worker failed with the exception", th);
            });
            try {
                jobMaster.startJobMasterBlocking();
            } catch (Twister2Exception e) {
                LOG.log(Level.SEVERE, e.getMessage(), e);
            }
            KubernetesController.close();
        } catch (UnknownHostException e2) {
            throw new RuntimeException("Cannot get localHost.", e2);
        }
    }

    public static boolean initializeZooKeeper(Config config, String str, String str2, JobMasterAPI.JobMasterState jobMasterState) {
        CuratorFramework connectToServer = ZKUtils.connectToServer(ZKContext.serverAddresses(config), FaultToleranceContext.sessionTimeout(config));
        String rootNode = ZKContext.rootNode(config);
        boolean isThereJobZNode = JobZNodeManager.isThereJobZNode(connectToServer, rootNode, str);
        if (jobMasterState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
            if (!isThereJobZNode) {
                LOG.severe("Job is restarting but job znode does not exists at ZK server at: " + ZKUtils.jobDir(rootNode, str));
                return false;
            }
            ZKPersStateManager.updateJobMasterStatus(connectToServer, rootNode, str, str2, JobMasterAPI.JobMasterState.JM_RESTARTED);
            ZKEventsManager.initEventCounter(connectToServer, rootNode, str);
            job = JobZNodeManager.readJobZNode(connectToServer, rootNode, str).getJob();
            return true;
        }
        if (!CheckpointingContext.startingFromACheckpoint(config) && isThereJobZNode) {
            LOG.severe("Job is starting for the first time, but there is an existing znode at ZK server: " + ZKUtils.jobDir(rootNode, str));
            return false;
        }
        if (CheckpointingContext.startingFromACheckpoint(config) && isThereJobZNode) {
            JobZNodeManager.deleteJobZNode(connectToServer, rootNode, str);
        }
        JobZNodeManager.createJobZNode(connectToServer, rootNode, job);
        ZKEphemStateManager.createEphemDir(connectToServer, rootNode, job.getJobId());
        ZKEventsManager.createEventsZNode(connectToServer, rootNode, job.getJobId());
        ZKBarrierManager.createDefaultBarrierDir(connectToServer, rootNode, job.getJobId());
        ZKBarrierManager.createInitBarrierDir(connectToServer, rootNode, job.getJobId());
        ZKPersStateManager.createPersStateDir(connectToServer, rootNode, job.getJobId());
        JobZNodeManager.createJstZNode(connectToServer, rootNode, str, Long.parseLong(System.getenv(K8sEnvVariables.JOB_SUBMISSION_TIME.name())));
        ZKPersStateManager.createJobMasterPersState(connectToServer, rootNode, str, str2);
        return true;
    }
}
