package edu.iu.dsc.tws.rsched.schedulers.aurora;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.Context;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.common.zk.ZKWorkerController;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Paths;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/aurora/AuroraWorkerStarter.class */
public final class AuroraWorkerStarter {
    public static final Logger LOG = Logger.getLogger(AuroraWorkerStarter.class.getName());
    private InetAddress workerAddress;
    private int workerPort;
    private String mesosTaskID;
    private Config config;
    private JobAPI.Job job;
    private ZKWorkerController zkWorkerController;

    private AuroraWorkerStarter() {
    }

    public static void main(String[] strArr) {
        AuroraWorkerStarter createAuroraWorker = createAuroraWorker();
        createAuroraWorker.waitAndGetAllWorkers();
        String workerClass = SchedulerContext.workerClass(createAuroraWorker.config);
        try {
            IWorker iWorker = (IWorker) ReflectionUtils.newInstance(workerClass);
            LOG.info("loaded worker class: " + workerClass);
            iWorker.execute(createAuroraWorker.config, createAuroraWorker.job, (IWorkerController) null, (IPersistentVolume) null, (IVolatileVolume) null);
            createAuroraWorker.close();
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.log(Level.SEVERE, String.format("failed to load the worker class %s", workerClass), e);
            throw new RuntimeException(e);
        }
    }

    public static AuroraWorkerStarter createAuroraWorker() {
        AuroraWorkerStarter auroraWorkerStarter = new AuroraWorkerStarter();
        String property = System.getProperty("hostname");
        String property2 = System.getProperty("tcpPort");
        auroraWorkerStarter.mesosTaskID = System.getProperty("taskID");
        try {
            auroraWorkerStarter.workerAddress = InetAddress.getByName(property);
            auroraWorkerStarter.workerPort = Integer.parseInt(property2);
            LOG.log(Level.INFO, "worker IP: " + property + " workerPort: " + property2);
            LOG.log(Level.INFO, "worker mesosTaskID: " + auroraWorkerStarter.mesosTaskID);
            auroraWorkerStarter.readJobDescFile();
            logJobInfo(auroraWorkerStarter.job);
            auroraWorkerStarter.loadConfig();
            LOG.fine("Config from files: \n" + auroraWorkerStarter.config.toString());
            auroraWorkerStarter.overrideConfigsFromJob();
            auroraWorkerStarter.initializeWithZooKeeper();
            return auroraWorkerStarter;
        } catch (UnknownHostException e) {
            LOG.log(Level.SEVERE, "worker ip address is not valid: " + property, (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    private void readJobDescFile() {
        String str = "twister2-job/" + System.getProperty("job_desc_file");
        this.job = JobUtils.readJobFile(str);
        LOG.log(Level.INFO, "Job description file is read: " + str);
    }

    public void loadConfig() {
        String path = Paths.get("", new String[0]).toAbsolutePath().toString();
        String property = System.getProperty("cluster_type");
        String str = path + "/twister2-job/" + property;
        LOG.log(Level.INFO, String.format("Loading configuration with twister2_home: %s and configuration: %s", path, str));
        this.config = Config.newBuilder().putAll(ConfigLoader.loadConfig(path, "twister2-job", property)).put(Context.TWISTER2_HOME.getKey(), path).put(Context.TWISTER2_CONF.getKey(), str).put("twister2.cluster.type", property).build();
        LOG.log(Level.INFO, "Config files are read from directory: " + str);
    }

    public void overrideConfigsFromJob() {
        Config.Builder putAll = Config.newBuilder().putAll(this.config);
        JobAPI.Config config = this.job.getConfig();
        LOG.log(Level.INFO, "Number of configs to override from job conf: " + config.getKvsCount());
        for (JobAPI.Config.KeyValue keyValue : config.getKvsList()) {
            putAll.put(keyValue.getKey(), keyValue.getValue());
            LOG.log(Level.INFO, "Overriden conf key-value pair: " + keyValue.getKey() + ": " + keyValue.getValue());
        }
        this.config = putAll.build();
    }

    public void initializeWithZooKeeper() {
        long currentTimeMillis = System.currentTimeMillis();
        this.zkWorkerController = new ZKWorkerController(this.config, this.job.getJobId(), this.job.getNumberOfWorkers(), WorkerInfoUtils.createWorkerInfo(0, this.workerAddress.getHostAddress(), this.workerPort, NodeInfoUtils.createNodeInfo((String) null, (String) null, (String) null)));
        try {
            this.zkWorkerController.initialize(0, currentTimeMillis);
        } catch (Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), (Throwable) e);
        }
        LOG.info("Initialization for the worker: " + this.zkWorkerController.getWorkerInfo() + " took: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
    }

    public void waitAndGetAllWorkers() {
        LOG.info("Waiting for " + this.job.getNumberOfWorkers() + " workers to join .........");
        long currentTimeMillis = System.currentTimeMillis();
        try {
            List allWorkers = this.zkWorkerController.getAllWorkers();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (allWorkers == null) {
                LOG.log(Level.SEVERE, "Could not get full worker list. timeout limit has been reached !!!!");
            } else {
                LOG.log(Level.INFO, "Waited " + currentTimeMillis2 + " ms for all workers to join.");
                LOG.info("list of all joined workers in the job: " + WorkerInfoUtils.workerListAsString(allWorkers));
            }
        } catch (TimeoutException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void close() {
        this.zkWorkerController.close();
    }

    public static void logJobInfo(JobAPI.Job job) {
        StringBuffer stringBuffer = new StringBuffer("Job Details:");
        stringBuffer.append("\nJob name: " + job.getJobName());
        stringBuffer.append("\nJob file: " + job.getJobFormat().getJobFile());
        stringBuffer.append("\nnumber of workers: " + job.getNumberOfWorkers());
        stringBuffer.append("\nCPUs: " + job.getComputeResource(0).getCpu());
        stringBuffer.append("\nRAM: " + job.getComputeResource(0).getRamMegaBytes());
        stringBuffer.append("\nDisk: " + job.getComputeResource(0).getDiskGigaBytes());
        JobAPI.Config config = job.getConfig();
        stringBuffer.append("\nnumber of key-values in job conf: " + config.getKvsCount());
        for (JobAPI.Config.KeyValue keyValue : config.getKvsList()) {
            stringBuffer.append("\n" + keyValue.getKey() + ": " + keyValue.getValue());
        }
        LOG.info(stringBuffer.toString());
    }
}
