package ml.shifu.shifu.core.yarn.appmaster;

import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import ml.shifu.shifu.core.yarn.appmaster.TensorflowSession;
import ml.shifu.shifu.core.yarn.util.CommonUtils;
import ml.shifu.shifu.core.yarn.util.Constants;
import ml.shifu.shifu.core.yarn.util.GlobalConfigurationKeys;
import ml.shifu.shifu.util.HDFSUtils;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

/* loaded from: input_file:ml/shifu/shifu/core/yarn/appmaster/TensorflowApplicationMaster.class */
public class TensorflowApplicationMaster extends AbstractApplicationMaster {
    private static final Log LOG = LogFactory.getLog(TensorflowApplicationMaster.class);
    private final AbstractLivelinessMonitor<TensorflowTask> hbMonitor;
    private int hbInterval;
    private int maxConsecutiveHBMiss;
    private TensorflowSession session;
    private int appTimeout;
    private ContainerId containerId;
    private String appIdString;
    private volatile boolean taskHasMissesHB = false;
    private YarnConfiguration yarnConf = new YarnConfiguration();
    private Configuration globalConf = new Configuration();
    FileSystem hdfs = HDFSUtils.getFS();
    private Map<String, String> containerEnv = new ConcurrentHashMap();

    private TensorflowApplicationMaster() {
        this.yarnConf.set("yarn.nodemanager.admin-env", "");
        this.hbMonitor = new AbstractLivelinessMonitor<TensorflowTask>("Tensorflow Task liveliness Monitor", new MonotonicClock()) { // from class: ml.shifu.shifu.core.yarn.appmaster.TensorflowApplicationMaster.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void expire(TensorflowTask tensorflowTask) {
                onTaskDeemedDead(tensorflowTask);
            }

            protected void serviceStart() throws Exception {
                setMonitorInterval(TensorflowApplicationMaster.this.hbInterval * 3);
                setExpireInterval(TensorflowApplicationMaster.this.hbInterval * Math.max(3, TensorflowApplicationMaster.this.maxConsecutiveHBMiss));
                super.serviceStart();
            }

            private void onTaskDeemedDead(TensorflowTask tensorflowTask) {
                TensorflowApplicationMaster.LOG.info("Task with id [" + tensorflowTask.getId() + "] has missed [" + TensorflowApplicationMaster.this.maxConsecutiveHBMiss + "] heartbeats.. Ending application !!");
                TensorflowApplicationMaster.LOG.error("Task with id [" + tensorflowTask.getId() + "] deemed dead!!");
                TensorflowApplicationMaster.this.taskHasMissesHB = true;
            }
        };
    }

    public static void main(String[] strArr) {
        try {
            new TensorflowApplicationMaster().run(strArr);
            LOG.info("Application Master completed successfully. Exiting");
            System.exit(0);
        } catch (Exception e) {
            LOG.error("Fail to execute Tensorflow application master", e);
            System.exit(-1);
        }
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void init(String[] strArr) {
        try {
            Options options = new Options();
            options.addOption("container_env", true, "");
            this.containerEnv.putAll(CommonUtils.parseKeyValue(new GnuParser().parse(options, strArr).getOptionValues("container_env")));
            this.containerId = ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name()));
            this.appIdString = this.containerId.getApplicationAttemptId().getApplicationId().toString();
            this.globalConf.addResource(new Path(Constants.GLOBAL_FINAL_XML));
            this.appTimeout = this.globalConf.getInt(GlobalConfigurationKeys.APPLICATION_TIMEOUT, 0);
            this.hbInterval = this.globalConf.getInt(GlobalConfigurationKeys.TASK_HEARTBEAT_INTERVAL_MS, GlobalConfigurationKeys.DEFAULT_TASK_HEARTBEAT_INTERVAL_MS);
            this.maxConsecutiveHBMiss = this.globalConf.getInt(GlobalConfigurationKeys.TASK_MAX_MISSED_HEARTBEATS, 25);
            this.hbMonitor.init(this.globalConf);
            this.session = new TensorflowSession(this.globalConf);
        } catch (ParseException e) {
            throw new IllegalStateException("Parsing app master arguments fails", e);
        }
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void registerRMCallbackHandler() {
        this.amRMClient = AMRMClientAsync.createAMRMClientAsync(GlobalConfigurationKeys.DEFAULT_TASK_HEARTBEAT_INTERVAL_MS, new AMRMCallbackHandler(this.globalConf, this.session, this.nmClientAsync, this.hbMonitor, this.containerEnv, this.appIdString));
        this.amRMClient.init(this.yarnConf);
        this.amRMClient.start();
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void registerNMCallbackHandler() {
        this.nmClientAsync = new NMClientAsyncImpl(new NMCallbackHandler());
        this.nmClientAsync.init(this.yarnConf);
        this.nmClientAsync.start();
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void prepareBeforeTaskExector() {
        this.hbMonitor.start();
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void scheduleTask() {
        this.session.scheduleTasks(this.amRMClient);
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected boolean monitor() {
        TensorflowTask taskFromNormalTasks;
        long currentTimeMillis = this.appTimeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appTimeout;
        while (true) {
            if (this.session.getState() == TensorflowSession.SessionState.STARTING_CONTAINER || this.session.getState() == TensorflowSession.SessionState.REGESTERING_CLUSTER) {
                LOG.info("Session is REGESTERING_CLUSTER, we do check.");
                int _getReadyPsCnt = this.session.getTensorflowClusterSpec()._getReadyPsCnt();
                int _getReadyWorkerCnt = this.session.getTensorflowClusterSpec()._getReadyWorkerCnt();
                int numTotalPsTasks = this.session.getNumTotalPsTasks();
                int numTotalBackupWorkerTask = this.session.getNumTotalBackupWorkerTask() + this.session.getNumTotalWorkerTasks();
                boolean _isChiefWorkerReady = this.session.getTensorflowClusterSpec()._isChiefWorkerReady();
                LOG.warn("readyPsCnt:" + _getReadyPsCnt + "totalPsCnt: " + numTotalPsTasks + "readyWorkerCnt: " + _getReadyWorkerCnt + "totalWorkerCnt: " + numTotalBackupWorkerTask);
                if (this.session.getState() == TensorflowSession.SessionState.REGESTERING_CLUSTER && _isChiefWorkerReady && _getReadyPsCnt > numTotalPsTasks * 0.95d && _getReadyWorkerCnt > numTotalBackupWorkerTask * 0.95d && numTotalBackupWorkerTask - _getReadyWorkerCnt < this.session.getNumTotalBackupWorkerTask() && System.currentTimeMillis() - this.session.getStartTimeOfRegisteringCluster() > 360000) {
                    LOG.warn("We wait cluster register too long time, we are going to ignore worker cnt: " + (numTotalBackupWorkerTask - _getReadyWorkerCnt) + " and ignore ps cnt: " + (numTotalPsTasks - _getReadyPsCnt));
                    this.session.setState(TensorflowSession.SessionState.TRAINING);
                    TensorflowSession.TensorflowClusterSpec tensorflowClusterSpec = this.session.getTensorflowClusterSpec();
                    int i = numTotalPsTasks - 1;
                    for (int i2 = 0; i2 < i; i2++) {
                        if (StringUtils.isBlank(tensorflowClusterSpec.getPs()[i2])) {
                            while (StringUtils.isBlank(tensorflowClusterSpec.getPs()[i])) {
                                i--;
                            }
                            if (i > i2) {
                                LOG.info("we are going to use ps task: " + i + " to replace " + i2);
                                TensorflowTask taskFromNormalTasks2 = this.session.getTaskFromNormalTasks(Constants.PS_JOB_NAME, Integer.toString(i));
                                tensorflowClusterSpec.getPs()[i2] = tensorflowClusterSpec.getPs()[i];
                                tensorflowClusterSpec.getPs()[i] = null;
                                TensorflowTask taskFromNormalTasks3 = this.session.getTaskFromNormalTasks(Constants.PS_JOB_NAME, Integer.toString(i2));
                                this.session.getJobNameToTasks().get(Constants.PS_JOB_NAME)[i2] = taskFromNormalTasks2;
                                taskFromNormalTasks2.setArrayIndex(taskFromNormalTasks3.getArrayIndex());
                                taskFromNormalTasks2.setTaskIndex(taskFromNormalTasks3.getTaskIndex());
                                this.session.stopContainer(this.nmClientAsync, taskFromNormalTasks3.getContainer());
                                i--;
                            }
                        }
                    }
                    int i3 = numTotalBackupWorkerTask - 1;
                    for (int i4 = 0; i4 < i3; i4++) {
                        if (StringUtils.isBlank(tensorflowClusterSpec.getWorker()[i4])) {
                            while (StringUtils.isBlank(tensorflowClusterSpec.getWorker()[i3])) {
                                i3--;
                            }
                            if (i3 > i4) {
                                LOG.info("we are going to use worker task: " + i3 + " to replace " + i4);
                                TensorflowTask taskFromBackupTasks = this.session.getTaskFromBackupTasks(Constants.WORKER_JOB_NAME, Integer.toString(i3));
                                tensorflowClusterSpec.getWorker()[i4] = tensorflowClusterSpec.getWorker()[i3];
                                tensorflowClusterSpec.getWorker()[i3] = null;
                                if (i4 + 1 > this.session.getNumTotalWorkerTasks()) {
                                    taskFromNormalTasks = this.session.getTaskFromBackupTasks(Constants.WORKER_JOB_NAME, Integer.toString(i4));
                                    this.session.getJobNameToBackupTask().get(Constants.WORKER_JOB_NAME).remove(taskFromNormalTasks);
                                } else {
                                    taskFromNormalTasks = this.session.getTaskFromNormalTasks(Constants.WORKER_JOB_NAME, Integer.toString(i4));
                                    this.session.getJobNameToTasks().get(Constants.WORKER_JOB_NAME)[i4] = taskFromBackupTasks;
                                    this.session.weakupBackup(taskFromBackupTasks, taskFromNormalTasks.getTrainingDataPaths());
                                }
                                this.session.getJobNameToBackupTask().get(Constants.WORKER_JOB_NAME).remove(taskFromBackupTasks);
                                taskFromBackupTasks.setArrayIndex(taskFromNormalTasks.getArrayIndex());
                                taskFromBackupTasks.setTaskIndex(taskFromNormalTasks.getTaskIndex());
                                taskFromBackupTasks.setTrainingDataPaths(taskFromNormalTasks.getTrainingDataPaths());
                                this.session.stopContainer(this.nmClientAsync, taskFromNormalTasks.getContainer());
                                i3--;
                            }
                        }
                    }
                    this.session.getJobNameToBackupTaskNum().put(Constants.WORKER_JOB_NAME, Integer.valueOf(_getReadyWorkerCnt - this.session.getNumTotalWorkerTasks()));
                    tensorflowClusterSpec.setWorker((String[]) Arrays.copyOfRange(tensorflowClusterSpec.getWorker(), 0, _getReadyWorkerCnt));
                    tensorflowClusterSpec.setPs((String[]) Arrays.copyOfRange(tensorflowClusterSpec.getPs(), 0, _getReadyPsCnt));
                    LOG.info("Left Backup worker : " + (_getReadyWorkerCnt - this.session.getNumTotalWorkerTasks()));
                    try {
                        TensorflowSession.getZookeeperServer().createOrSetExt(Constants.TENSORFLOW_FINAL_CLUSTER, this.session.getTensorflowClusterSpec().toString().getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true, -1);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                if (System.currentTimeMillis() - this.session.getStartTimeOfRegisteringCluster() > 1200000) {
                    LOG.error("Wait too long for registering cluster. Please restart training....");
                    return true;
                }
            }
            if (System.currentTimeMillis() > currentTimeMillis) {
                LOG.error("Application times out.");
                return false;
            }
            if (!this.session.isChiefWorkerSuccess()) {
                LOG.info("Chief Worker exist with non-zero exit code. Training has finished.");
                return false;
            }
            if (this.taskHasMissesHB) {
                LOG.info("Application failed due to missed heartbeats");
                return false;
            }
            if (this.session.getFailedWorkers().size() > 0) {
                LOG.info("Some workers fails");
                return false;
            }
            if (this.session.getFailedPs().size() >= this.session.getNumTotalPsTasks()) {
                LOG.info("All PS fails, training could not continue..");
                return false;
            }
            if (this.session.isChiefWorkerComplete()) {
                LOG.info("Chief worker complete and success, so training process is over...");
                return true;
            }
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e2) {
                LOG.error("Monitor: Thread interrupted", e2);
            }
        }
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected boolean canRecovered() {
        return this.session.isChiefWorkerSuccess() && this.session.getFailedPs().size() < this.session.getNumTotalPsTasks() && ((double) this.session.getFailedWorkers().size()) < this.session.failedWorkerMaxLimit();
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void recovery() {
        ConcurrentLinkedQueue<Integer> failedWorkers = this.session.getFailedWorkers();
        ConcurrentLinkedQueue<TensorflowTask> concurrentLinkedQueue = this.session.getJobNameToBackupTask().get(Constants.WORKER_JOB_NAME);
        while (!failedWorkers.isEmpty() && !concurrentLinkedQueue.isEmpty()) {
            try {
                this.session.weakupBackup(concurrentLinkedQueue.poll(), failedWorkers.poll());
            } catch (Exception e) {
                LOG.error("error to write zookeeper", e);
            }
        }
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void updateTaskStatus() {
        this.session.updateSessionStatus();
        CommonUtils.printWorkerTasksCompleted(this.session.getNumCompletedWorkerTasks(), this.session.getNumTotalWorkerTasks());
        FinalApplicationStatus finalStatus = this.session.getFinalStatus();
        String finalMessage = this.session.getFinalMessage();
        if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
            LOG.info("tensorflow session failed: " + finalMessage);
        } else {
            LOG.info("tensorflow session is successful");
        }
    }

    @Override // ml.shifu.shifu.core.yarn.appmaster.AbstractApplicationMaster
    protected void stop() {
        try {
            this.amRMClient.unregisterApplicationMaster(this.session.getFinalStatus(), this.session.getFinalMessage(), (String) null);
        } catch (Exception e) {
            LOG.error("Failed to unregister application", e);
        }
        this.nmClientAsync.stop();
        this.amRMClient.waitForServiceToStop(5000L);
        this.amRMClient.stop();
        try {
            Thread.sleep(30000L);
        } catch (InterruptedException e2) {
            LOG.error("stop: Thread interrupted", e2);
        }
    }
}
