package ml.shifu.shifu.core.yarn.appmaster;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import ml.shifu.guagua.coordinator.zk.GuaguaZooKeeper;
import ml.shifu.guagua.coordinator.zk.ZooKeeperUtils;
import ml.shifu.shifu.core.TrainingIntermediateResult;
import ml.shifu.shifu.core.yarn.util.CommonUtils;
import ml.shifu.shifu.core.yarn.util.Constants;
import ml.shifu.shifu.core.yarn.util.GlobalConfigurationKeys;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:ml/shifu/shifu/core/yarn/appmaster/TensorflowSession.class */
public class TensorflowSession implements Watcher {
    private Configuration globalConf;
    private Map<String, TensorFlowContainerRequest> containerRequests;
    private Map<String, List<AMRMClient.ContainerRequest>> jobNameToContainerRequests;
    private Map<String, TensorflowTask> containerIdToTask;
    private Map<String, TensorflowTask[]> jobNameToTasks;
    private Map<String, ConcurrentLinkedQueue<TensorflowTask>> jobNameToBackupTask;
    private TensorflowClusterSpec tensorflowClusterSpec;
    private Map<String, Integer> jobNameToPendingTaskNumber;
    private Map<String, Integer> jobNameToPendingBackupTaskNumber;
    private int numRequestedContainers;
    private List<StringBuilder> splitedTrainingData;
    private boolean isChiefWorkerComplete;
    private AtomicInteger numCompletedWorkerTasks;
    private Map<String, Integer> jobNameToTaskNum;
    private Map<String, Integer> jobNameToBackupTaskNum;
    private ConcurrentLinkedQueue<Integer> failedWorkers;
    private ConcurrentLinkedQueue<String> failedPs;
    private FinalApplicationStatus sessionFinalStatus;
    private String sessionFinalMessage;
    private boolean chiefWorkerSuccess;
    private int totalEpochs;
    private AtomicInteger globalEpoch;
    private ConcurrentHashMap<String, TrainingIntermediateResult> intermediateResults;
    private SessionState state;
    private long startTimeOfRegisteringCluster;
    private CommonUtils.ClientConsoleBoard clientConsoleBoard;
    private static final Log LOG = LogFactory.getLog(TensorflowSession.class);
    public static int sessionId = 0;
    private static String zookeeperServerHostPort = null;
    private static GuaguaZooKeeper zookeeperServer = null;

    /* loaded from: input_file:ml/shifu/shifu/core/yarn/appmaster/TensorflowSession$SessionState.class */
    public enum SessionState {
        INIT,
        STARTING_CONTAINER,
        REGESTERING_CLUSTER,
        TRAINING,
        FINISH
    }

    /* loaded from: input_file:ml/shifu/shifu/core/yarn/appmaster/TensorflowSession$TensorflowClusterSpec.class */
    class TensorflowClusterSpec {
        private String[] ps;
        private String[] worker;
        private int readyPsCnt = 0;
        private int readyWorkerCnt = 0;
        private boolean isChiefWorkerReady = false;

        TensorflowClusterSpec(int i, int i2) {
            this.ps = new String[i];
            this.worker = new String[i2];
        }

        public synchronized void add(String str, int i, String str2) {
            if (Constants.PS_JOB_NAME.equalsIgnoreCase(str)) {
                this.ps[i] = str2;
                this.readyPsCnt++;
            } else {
                this.worker[i] = str2;
                this.readyWorkerCnt++;
            }
            if (Constants.WORKER_JOB_NAME.equals(str) && i == 0) {
                this.isChiefWorkerReady = true;
            }
        }

        public String[] getPs() {
            return this.ps;
        }

        public void setPs(String[] strArr) {
            this.ps = strArr;
        }

        public String[] getWorker() {
            return this.worker;
        }

        public void setWorker(String[] strArr) {
            this.worker = strArr;
        }

        public int _getReadyPsCnt() {
            return this.readyPsCnt;
        }

        public int _getReadyWorkerCnt() {
            return this.readyWorkerCnt;
        }

        public synchronized int totalWorkerAndPs() {
            return this.readyWorkerCnt + this.readyPsCnt;
        }

        public String toString() {
            try {
                return new ObjectMapper().writer().withDefaultPrettyPrinter().writeValueAsString(this);
            } catch (JsonProcessingException e) {
                TensorflowSession.LOG.error("transfer cluster failed", e);
                return "";
            }
        }

        public boolean _isChiefWorkerReady() {
            return this.isChiefWorkerReady;
        }
    }

    public TensorflowSession() {
        this.jobNameToContainerRequests = new HashMap();
        this.containerIdToTask = new HashMap();
        this.jobNameToTasks = new ConcurrentHashMap();
        this.jobNameToBackupTask = new ConcurrentHashMap();
        this.jobNameToPendingTaskNumber = new ConcurrentHashMap();
        this.jobNameToPendingBackupTaskNumber = new ConcurrentHashMap();
        this.numRequestedContainers = 0;
        this.splitedTrainingData = null;
        this.isChiefWorkerComplete = false;
        this.numCompletedWorkerTasks = new AtomicInteger(0);
        this.jobNameToTaskNum = new ConcurrentHashMap();
        this.jobNameToBackupTaskNum = new HashMap();
        this.failedWorkers = new ConcurrentLinkedQueue<>();
        this.failedPs = new ConcurrentLinkedQueue<>();
        this.sessionFinalStatus = FinalApplicationStatus.UNDEFINED;
        this.sessionFinalMessage = null;
        this.chiefWorkerSuccess = true;
        this.globalEpoch = new AtomicInteger(0);
        this.intermediateResults = new ConcurrentHashMap<>();
        this.startTimeOfRegisteringCluster = Long.MAX_VALUE;
        setState(SessionState.INIT);
    }

    public TensorflowSession(Configuration configuration) {
        this.jobNameToContainerRequests = new HashMap();
        this.containerIdToTask = new HashMap();
        this.jobNameToTasks = new ConcurrentHashMap();
        this.jobNameToBackupTask = new ConcurrentHashMap();
        this.jobNameToPendingTaskNumber = new ConcurrentHashMap();
        this.jobNameToPendingBackupTaskNumber = new ConcurrentHashMap();
        this.numRequestedContainers = 0;
        this.splitedTrainingData = null;
        this.isChiefWorkerComplete = false;
        this.numCompletedWorkerTasks = new AtomicInteger(0);
        this.jobNameToTaskNum = new ConcurrentHashMap();
        this.jobNameToBackupTaskNum = new HashMap();
        this.failedWorkers = new ConcurrentLinkedQueue<>();
        this.failedPs = new ConcurrentLinkedQueue<>();
        this.sessionFinalStatus = FinalApplicationStatus.UNDEFINED;
        this.sessionFinalMessage = null;
        this.chiefWorkerSuccess = true;
        this.globalEpoch = new AtomicInteger(0);
        this.intermediateResults = new ConcurrentHashMap<>();
        this.startTimeOfRegisteringCluster = Long.MAX_VALUE;
        this.globalConf = configuration;
        this.totalEpochs = this.globalConf.getInt(GlobalConfigurationKeys.SHIFU_APPLICATION_EPOCHS, -1);
        this.containerRequests = CommonUtils.parseContainerRequests(this.globalConf);
        setState(SessionState.INIT);
        if (zookeeperServer == null) {
            zookeeperServerHostPort = startZookeeperServer();
            try {
                zookeeperServer = new GuaguaZooKeeper(zookeeperServerHostPort, Integer.MAX_VALUE, 5, GlobalConfigurationKeys.DEFAULT_TASK_HEARTBEAT_INTERVAL_MS, this);
            } catch (IOException e) {
                LOG.error("create zookeeper server fails!", e);
                throw new RuntimeException(e);
            }
        }
        for (Map.Entry<String, TensorFlowContainerRequest> entry : this.containerRequests.entrySet()) {
            String key = entry.getKey();
            TensorFlowContainerRequest value = entry.getValue();
            int numInstances = value.getNumInstances();
            int numBackupInstances = value.getNumBackupInstances();
            this.jobNameToTasks.put(key, new TensorflowTask[numInstances]);
            this.jobNameToBackupTask.put(key, new ConcurrentLinkedQueue<>());
            this.jobNameToTaskNum.put(key, Integer.valueOf(numInstances));
            this.jobNameToBackupTaskNum.put(key, Integer.valueOf(numBackupInstances));
        }
        this.tensorflowClusterSpec = new TensorflowClusterSpec(this.jobNameToTaskNum.get(Constants.PS_JOB_NAME).intValue() + this.jobNameToBackupTaskNum.get(Constants.PS_JOB_NAME).intValue(), this.jobNameToTaskNum.get(Constants.WORKER_JOB_NAME).intValue() + this.jobNameToBackupTaskNum.get(Constants.WORKER_JOB_NAME).intValue());
        try {
            this.splitedTrainingData = TrainingDataSet.getInstance().getSplitedFilePaths(configuration, this.jobNameToTaskNum.get(Constants.WORKER_JOB_NAME).intValue(), configuration.get(GlobalConfigurationKeys.TRAINING_DATA_PATH));
            LOG.info("splitedTrainingData: " + this.splitedTrainingData.toString());
            this.clientConsoleBoard = new CommonUtils.ClientConsoleBoard(configuration);
        } catch (Exception e2) {
            LOG.error("Splitting training data fails or count file line fails!", e2);
            throw new RuntimeException(e2);
        }
    }

    private static String startZookeeperServer() {
        String currentHostName = CommonUtils.getCurrentHostName();
        try {
            int startEmbedZooKeeper = ZooKeeperUtils.startEmbedZooKeeper();
            ZooKeeperUtils.checkIfEmbedZooKeeperStarted(startEmbedZooKeeper);
            return currentHostName + ":" + startEmbedZooKeeper;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void scheduleTasks(AMRMClientAsync<AMRMClient.ContainerRequest> aMRMClientAsync) {
        for (Map.Entry<String, TensorFlowContainerRequest> entry : this.containerRequests.entrySet()) {
            String key = entry.getKey();
            TensorFlowContainerRequest value = entry.getValue();
            if (!this.jobNameToContainerRequests.containsKey(key)) {
                this.jobNameToContainerRequests.put(key, new ArrayList());
            }
            for (int i = 0; i < value.getNumInstances() + value.getNumBackupInstances(); i++) {
                AMRMClient.ContainerRequest containerRequest = setupContainerRequestForRM(value);
                this.jobNameToContainerRequests.get(key).add(containerRequest);
                aMRMClientAsync.addContainerRequest(containerRequest);
                this.numRequestedContainers++;
            }
            this.jobNameToPendingTaskNumber.put(key, Integer.valueOf(value.getNumInstances()));
            this.jobNameToPendingBackupTaskNumber.put(key, Integer.valueOf(value.getNumBackupInstances()));
        }
        setState(SessionState.STARTING_CONTAINER);
    }

    private AMRMClient.ContainerRequest setupContainerRequestForRM(TensorFlowContainerRequest tensorFlowContainerRequest) {
        AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance((int) tensorFlowContainerRequest.getMemory(), tensorFlowContainerRequest.getVCores()), (String[]) null, (String[]) null, Priority.newInstance(tensorFlowContainerRequest.getPriority()));
        LOG.info("Requested container ask: " + containerRequest.toString());
        return containerRequest;
    }

    public synchronized TensorflowTask distributeTaskToContainer(Container container) {
        String availableJobName = getAvailableJobName(container);
        if (StringUtils.isBlank(availableJobName)) {
            LOG.error("Cannot distribute container: " + container.toString());
            throw new RuntimeException("couldn't find job to match container");
        }
        String containerId = container.getId().toString();
        try {
            zookeeperServer.exists(Constants.TENSORFLOW_CLUSTER_ROOT_PATH + containerId, this);
            zookeeperServer.exists(Constants.WORKER_INTERMEDIATE_RESULT_ROOT_PATH + containerId, this);
            if (this.jobNameToPendingTaskNumber.get(availableJobName).intValue() > 0) {
                TensorflowTask[] tensorflowTaskArr = this.jobNameToTasks.get(availableJobName);
                for (int i = 0; i < tensorflowTaskArr.length; i++) {
                    if (tensorflowTaskArr[i] == null) {
                        tensorflowTaskArr[i] = new TensorflowTask(availableJobName, String.valueOf(i), sessionId, container, zookeeperServerHostPort, this.globalConf, false, i);
                        if (Constants.WORKER_JOB_NAME.equalsIgnoreCase(availableJobName)) {
                            tensorflowTaskArr[i].setTrainingDataPaths(this.splitedTrainingData.get(i).toString());
                        }
                        this.jobNameToPendingTaskNumber.put(availableJobName, Integer.valueOf(this.jobNameToPendingTaskNumber.get(availableJobName).intValue() - 1));
                        this.containerIdToTask.put(containerId, tensorflowTaskArr[i]);
                        return tensorflowTaskArr[i];
                    }
                }
            } else if (this.jobNameToPendingBackupTaskNumber.get(availableJobName).intValue() > 0) {
                TensorflowTask tensorflowTask = new TensorflowTask(availableJobName, String.valueOf(this.jobNameToTaskNum.get(availableJobName).intValue() + this.jobNameToBackupTask.get(availableJobName).size()), sessionId, container, zookeeperServerHostPort, this.globalConf, true, -1);
                this.jobNameToBackupTask.get(availableJobName).offer(tensorflowTask);
                this.jobNameToPendingBackupTaskNumber.put(availableJobName, Integer.valueOf(this.jobNameToPendingBackupTaskNumber.get(availableJobName).intValue() - 1));
                this.containerIdToTask.put(containerId, tensorflowTask);
                return tensorflowTask;
            }
            throw new RuntimeException("Error when distribute container to task");
        } catch (Exception e) {
            LOG.error("watch container fails", e);
            throw new RuntimeException(e);
        }
    }

    private synchronized String getAvailableJobName(Container container) {
        LOG.info("allocated resource: " + container.getResource().toString());
        LOG.info("remaining resource: " + this.jobNameToPendingTaskNumber);
        LOG.info("session id: " + sessionId);
        for (Map.Entry<String, TensorFlowContainerRequest> entry : this.containerRequests.entrySet()) {
            String key = entry.getKey();
            TensorFlowContainerRequest value = entry.getValue();
            int intValue = this.jobNameToPendingTaskNumber.get(key).intValue();
            int intValue2 = this.jobNameToPendingBackupTaskNumber.get(key).intValue();
            if (((int) value.getMemory()) == container.getResource().getMemory() && value.getVCores() == container.getResource().getVirtualCores() && (intValue > 0 || intValue2 > 0)) {
                return key;
            }
        }
        return null;
    }

    public TensorflowTask getTaskByContainerId(ContainerId containerId) {
        return this.containerIdToTask.get(containerId.toString());
    }

    public TensorflowTask getTaskFromNormalTasks(String str, String str2) {
        Iterator<Map.Entry<String, TensorflowTask[]>> it = this.jobNameToTasks.entrySet().iterator();
        while (it.hasNext()) {
            for (TensorflowTask tensorflowTask : it.next().getValue()) {
                if (tensorflowTask.getJobName().equals(str) && tensorflowTask.getTaskIndex().equals(str2)) {
                    return tensorflowTask;
                }
            }
        }
        return null;
    }

    public TensorflowTask getTaskFromBackupTasks(String str, String str2) {
        Iterator<TensorflowTask> it = this.jobNameToBackupTask.get(str).iterator();
        while (it.hasNext()) {
            TensorflowTask next = it.next();
            if (next.getJobName().equals(str) && next.getTaskIndex().equals(str2)) {
                return next;
            }
        }
        return null;
    }

    private boolean isChief(String str, String str2) {
        return str.equals(Constants.WORKER_JOB_NAME) && str2.equals(GlobalConfigurationKeys.DEFAULT_TARGET_COLUMN_NUM);
    }

    public void setFinalStatus(FinalApplicationStatus finalApplicationStatus, String str) {
        this.sessionFinalStatus = finalApplicationStatus;
        this.sessionFinalMessage = str;
    }

    private int getFailedTasksNum(TensorflowTask[] tensorflowTaskArr) {
        int i = 0;
        for (TensorflowTask tensorflowTask : tensorflowTaskArr) {
            if (tensorflowTask == null) {
                LOG.error("Job is null, this should not happen.");
                setFinalStatus(FinalApplicationStatus.FAILED, "Job is null, this should not happen.");
                return 0;
            }
            if (tensorflowTask.exitStatus != 0) {
                i++;
                LOG.error("Job " + tensorflowTask.getJobName() + " at index: " + tensorflowTask.getTaskIndex() + " haven't finished yet.");
            }
        }
        return i;
    }

    public double failedPsMaxLimit() {
        return (0.9d * getNumTotalPsTasks()) + getJobNameToBackupTask().get(Constants.PS_JOB_NAME).size();
    }

    public double failedWorkerMaxLimit() {
        return (0.1d * getNumTotalWorkerTasks()) + getJobNameToBackupTask().get(Constants.WORKER_JOB_NAME).size();
    }

    public void updateSessionStatus() {
        if (getFinalStatus() == FinalApplicationStatus.FAILED) {
            return;
        }
        if (!this.chiefWorkerSuccess) {
            setFinalStatus(FinalApplicationStatus.FAILED, "Chief worker failed");
        } else {
            LOG.info("Session completed with chief worker success, setting final status SUCCEEDED.");
            setFinalStatus(FinalApplicationStatus.SUCCEEDED, null);
        }
    }

    public void onTaskCompleted(String str, String str2, int i) {
        LOG.info(String.format("Job %s:%s exited with %d", str, str2, Integer.valueOf(i)));
        TensorflowTask taskFromBackupTasks = getTaskFromBackupTasks(str, str2);
        if (taskFromBackupTasks != null) {
            this.jobNameToBackupTask.get(str).remove(taskFromBackupTasks);
            LOG.error("backup task fails!!");
            return;
        }
        TensorflowTask taskFromNormalTasks = getTaskFromNormalTasks(str, str2);
        Preconditions.checkNotNull(taskFromNormalTasks);
        taskFromNormalTasks.setExitStatus(i);
        if (!Constants.WORKER_JOB_NAME.equals(str)) {
            if (i != 0) {
                this.failedPs.offer(taskFromNormalTasks.getTaskIndex());
                return;
            }
            return;
        }
        LOG.info(str + ":" + str2 + ":" + i);
        if (i != 0) {
            if (isChief(str, str2)) {
                this.chiefWorkerSuccess = false;
            }
            this.failedWorkers.offer(Integer.valueOf(taskFromNormalTasks.getArrayIndex()));
        } else {
            this.numCompletedWorkerTasks.incrementAndGet();
            if (isChief(str, str2)) {
                this.isChiefWorkerComplete = true;
            }
        }
    }

    public void stopAllTasks(NMClientAsync nMClientAsync) {
        for (TensorflowTask tensorflowTask : this.containerIdToTask.values()) {
            if (tensorflowTask != null) {
                nMClientAsync.stopContainerAsync(tensorflowTask.getContainer().getId(), tensorflowTask.getContainer().getNodeId());
                LOG.info("Stop a task in container: containerId = " + tensorflowTask.getContainer().getId() + ", containerNode = " + tensorflowTask.getContainer().getNodeId().getHost());
            }
        }
    }

    public void stopContainer(NMClientAsync nMClientAsync, Container container) {
        LOG.info("container: " + container);
        LOG.info("Stop a task in container: containerId = " + container.getId() + ", containerNode = " + container.getNodeId().getHost());
        nMClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
    }

    public boolean isChiefWorkerComplete() {
        return this.isChiefWorkerComplete;
    }

    public AtomicInteger getNumCompletedWorkerTasks() {
        return this.numCompletedWorkerTasks;
    }

    public void setNumCompletedWorkerTasks(AtomicInteger atomicInteger) {
        this.numCompletedWorkerTasks = atomicInteger;
    }

    public int getNumTotalBackupWorkerTask() {
        return this.jobNameToBackupTaskNum.get(Constants.WORKER_JOB_NAME).intValue();
    }

    public int getNumTotalWorkerTasks() {
        return this.jobNameToTaskNum.get(Constants.WORKER_JOB_NAME).intValue();
    }

    public int getNumTotalPsTasks() {
        return this.jobNameToTaskNum.get(Constants.PS_JOB_NAME).intValue();
    }

    public ConcurrentLinkedQueue<Integer> getFailedWorkers() {
        return this.failedWorkers;
    }

    public ConcurrentLinkedQueue<String> getFailedPs() {
        return this.failedPs;
    }

    public Map<String, Integer> getJobNameToBackupTaskNum() {
        return this.jobNameToBackupTaskNum;
    }

    private void doStatistic() {
        int size = this.intermediateResults.size();
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        double d4 = 0.0d;
        Iterator<Map.Entry<String, TrainingIntermediateResult>> it = this.intermediateResults.entrySet().iterator();
        while (it.hasNext()) {
            TrainingIntermediateResult value = it.next().getValue();
            d += value.getTrainingError();
            d2 += value.getValidError();
            d3 += value.getCurrentEpochTime();
            d4 += value.getCurrentEpochValidTime();
        }
        LOG.info("Epoch: " + getGlobalEpoch().get() + " training error: " + (d / size) + " valid error: " + (d2 / size) + " training avg time: " + (d3 / size) + " valid avg time: " + (d4 / size));
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent == null || StringUtils.isBlank(watchedEvent.getPath())) {
            return;
        }
        if (!watchedEvent.getPath().contains(Constants.TENSORFLOW_CLUSTER_ROOT_PATH)) {
            if (watchedEvent.getPath().contains(Constants.WORKER_INTERMEDIATE_RESULT_ROOT_PATH)) {
                String replace = watchedEvent.getPath().replace(Constants.WORKER_INTERMEDIATE_RESULT_ROOT_PATH, "");
                synchronized (TensorflowSession.class) {
                    try {
                        TrainingIntermediateResult trainingIntermediateResult = new TrainingIntermediateResult(zookeeperServer.getData(watchedEvent.getPath(), this, (Stat) null));
                        int currentEpochStep = trainingIntermediateResult.getCurrentEpochStep();
                        if (currentEpochStep >= getGlobalEpoch().get()) {
                            if (currentEpochStep == getGlobalEpoch().get()) {
                                this.intermediateResults.putIfAbsent(replace, trainingIntermediateResult);
                            } else {
                                LOG.info("Epoch " + getGlobalEpoch() + " is finish..");
                                doStatistic();
                                this.intermediateResults.clear();
                                getGlobalEpoch().set(currentEpochStep);
                                this.intermediateResults.putIfAbsent(replace, trainingIntermediateResult);
                            }
                        }
                    } catch (Exception e) {
                        LOG.error("Getting worker intermediate result fails.", e);
                        throw new RuntimeException(e);
                    }
                }
                return;
            }
            return;
        }
        TensorflowTask tensorflowTask = this.containerIdToTask.get(watchedEvent.getPath().replace(Constants.TENSORFLOW_CLUSTER_ROOT_PATH, ""));
        if (getState() != SessionState.REGESTERING_CLUSTER) {
            LOG.warn("Tensorflow session is not REGESTERING_CLUSTER currently but try to register, we will ingore " + tensorflowTask);
            return;
        }
        LOG.info("This is tensorflow cluster...");
        try {
            String str = new String(zookeeperServer.getData(watchedEvent.getPath(), (Watcher) null, (Stat) null));
            tensorflowTask.setRegister(true);
            this.tensorflowClusterSpec.add(tensorflowTask.getJobName(), Integer.valueOf(tensorflowTask.getTaskIndex()).intValue(), str);
            int i = this.tensorflowClusterSpec.totalWorkerAndPs();
            if (i != this.numRequestedContainers) {
                if (i < this.numRequestedContainers) {
                    LOG.info("Total requested containers: " + this.numRequestedContainers + ", ready containers: " + i);
                    return;
                } else {
                    LOG.error("Total requested containers: " + this.numRequestedContainers + ", ready containers: " + i);
                    return;
                }
            }
            LOG.info("Get all tensorflow cluster host and port from containers");
            try {
                zookeeperServer.createOrSetExt(Constants.TENSORFLOW_FINAL_CLUSTER, this.tensorflowClusterSpec.toString().getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true, -1);
                setState(SessionState.TRAINING);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        } catch (Exception e3) {
            throw new RuntimeException(e3);
        }
    }

    public boolean isAllTaskAssignedContainer() {
        int i = 0;
        Iterator<Integer> it = this.jobNameToPendingTaskNumber.values().iterator();
        while (it.hasNext()) {
            i += it.next().intValue();
        }
        Iterator<Integer> it2 = this.jobNameToPendingBackupTaskNumber.values().iterator();
        while (it2.hasNext()) {
            i += it2.next().intValue();
        }
        return i == 0;
    }

    public boolean isChiefWorkerSuccess() {
        return this.chiefWorkerSuccess;
    }

    public FinalApplicationStatus getFinalStatus() {
        return this.sessionFinalStatus;
    }

    public String getFinalMessage() {
        return this.sessionFinalMessage;
    }

    public Configuration getGlobalConf() {
        return this.globalConf;
    }

    public Map<String, TensorflowTask[]> getJobNameToTasks() {
        return this.jobNameToTasks;
    }

    public Map<String, ConcurrentLinkedQueue<TensorflowTask>> getJobNameToBackupTask() {
        return this.jobNameToBackupTask;
    }

    public void weakupBackup(TensorflowTask tensorflowTask, Integer num) throws KeeperException, InterruptedException {
        TensorflowTask[] tensorflowTaskArr = this.jobNameToTasks.get(Constants.WORKER_JOB_NAME);
        TensorflowTask tensorflowTask2 = tensorflowTaskArr[num.intValue()];
        tensorflowTask.setTrainingDataPaths(tensorflowTask2.getTrainingDataPaths());
        tensorflowTask.setArrayIndex(num.intValue());
        LOG.info("failedWorkerTask.getTrainingDataPaths(): " + tensorflowTask2.getTrainingDataPaths());
        zookeeperServer.createOrSetExt(Constants.getTrainingDataZookeeperPath(tensorflowTask.getContainer().getId().toString()), tensorflowTask2.getTrainingDataPaths().getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true, -1);
        tensorflowTaskArr[num.intValue()] = tensorflowTask;
    }

    public void weakupBackup(TensorflowTask tensorflowTask, String str) {
        if (StringUtils.isBlank(str)) {
            return;
        }
        try {
            zookeeperServer.createOrSetExt(Constants.getTrainingDataZookeeperPath(tensorflowTask.getContainer().getId().toString()), str.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true, -1);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public AtomicInteger getGlobalEpoch() {
        return this.globalEpoch;
    }

    public void setGlobalEpoch(AtomicInteger atomicInteger) {
        this.globalEpoch = atomicInteger;
    }

    public int getTotalEpochs() {
        return this.totalEpochs;
    }

    public void setTotalEpochs(int i) {
        this.totalEpochs = i;
    }

    public TensorflowClusterSpec getTensorflowClusterSpec() {
        return this.tensorflowClusterSpec;
    }

    public static GuaguaZooKeeper getZookeeperServer() {
        return zookeeperServer;
    }

    public long getStartTimeOfRegisteringCluster() {
        return this.startTimeOfRegisteringCluster;
    }

    public void setStartTimeOfRegisteringCluster(long j) {
        this.startTimeOfRegisteringCluster = j;
    }

    public SessionState getState() {
        return this.state;
    }

    public void setState(SessionState sessionState) {
        this.state = sessionState;
    }
}
