package org.deeplearning4j.iterativereduce.runtime.yarn.appmaster;

import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.deeplearning4j.iterativereduce.runtime.ComputableMaster;
import org.deeplearning4j.iterativereduce.runtime.ConfigFields;
import org.deeplearning4j.iterativereduce.runtime.Utils;
import org.deeplearning4j.iterativereduce.runtime.yarn.ContainerManagerHandler;
import org.deeplearning4j.iterativereduce.runtime.yarn.ResourceManagerHandler;
import org.deeplearning4j.iterativereduce.runtime.yarn.avro.generated.StartupConfiguration;
import org.deeplearning4j.iterativereduce.runtime.yarn.avro.generated.WorkerId;
import org.deeplearning4j.scaleout.api.ir.Updateable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/deeplearning4j/iterativereduce/runtime/yarn/appmaster/ApplicationMaster.class */
public class ApplicationMaster<T extends Updateable> extends Configured implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationMaster.class);
    private String masterHost;
    private int masterPort;
    private InetSocketAddress masterAddr;
    private ComputableMaster<T> masterComputable;
    private Class<T> masterUpdateable;
    private int batchSize;
    private int iterationCount;
    private Map<CharSequence, CharSequence> appConfig;
    private Configuration conf;
    private ApplicationAttemptId appAttemptId;
    private String appName;
    private Properties props;
    private Set<ApplicationMaster<T>.ConfigurationTuple> confTuples;
    private Class<?> inputFormatClass;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/deeplearning4j/iterativereduce/runtime/yarn/appmaster/ApplicationMaster$ConfigurationTuple.class */
    public class ConfigurationTuple {
        private String host;
        private String workerId;
        private StartupConfiguration config;

        public ConfigurationTuple(String str, String str2, StartupConfiguration startupConfiguration) {
            this.host = str;
            this.workerId = str2;
            this.config = startupConfiguration;
            ApplicationMaster.LOG.debug("Created configuration typle, host=" + this.host + ", workerId=" + this.workerId + ", startupConfiguration=" + this.config);
        }

        public String getHost() {
            return this.host;
        }

        public String getWorkerId() {
            return this.workerId;
        }

        public StartupConfiguration getConfig() {
            return this.config;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/deeplearning4j/iterativereduce/runtime/yarn/appmaster/ApplicationMaster$LaunchContainerRunnabble.class */
    public class LaunchContainerRunnabble implements Runnable {
        String workerId;
        Container container;
        ContainerManagerHandler cmHandler;

        public LaunchContainerRunnabble(String str, Container container) {
            this.workerId = str;
            this.container = container;
        }

        @Override // java.lang.Runnable
        public void run() {
            ApplicationMaster.LOG.debug("Launching container for worker=" + this.workerId + ", container=" + this.container);
            this.cmHandler = new ContainerManagerHandler(ApplicationMaster.this.conf, this.container);
            this.cmHandler.getContainerManager();
            try {
                this.cmHandler.startContainer(Utils.getWorkerCommand(ApplicationMaster.this.conf, ApplicationMaster.this.props, ApplicationMaster.this.masterHost + ":" + ApplicationMaster.this.masterPort, this.workerId), Utils.getLocalResourcesForApplication(ApplicationMaster.this.conf, ApplicationMaster.this.appAttemptId.getApplicationId(), ApplicationMaster.this.appName, ApplicationMaster.this.props, LocalResourceVisibility.APPLICATION), Utils.getEnvironment(ApplicationMaster.this.conf, ApplicationMaster.this.props));
                this.cmHandler.getContainerStatus();
            } catch (IOException e) {
            }
        }
    }

    /* loaded from: input_file:org/deeplearning4j/iterativereduce/runtime/yarn/appmaster/ApplicationMaster$ReturnCode.class */
    private enum ReturnCode {
        OK(0),
        MASTER_ERROR(-1),
        CONTAINER_ERROR(1);

        private int code;

        ReturnCode(int i) {
            this.code = i;
        }

        public int getCode() {
            return this.code;
        }
    }

    public ApplicationMaster(ComputableMaster<T> computableMaster, Class<T> cls) throws IOException {
        this(9999, computableMaster, cls);
    }

    public ApplicationMaster(int i, ComputableMaster<T> computableMaster, Class<T> cls) throws IOException {
        this.masterHost = InetAddress.getLocalHost().getCanonicalHostName();
        this.masterPort = i;
        this.masterAddr = new InetSocketAddress(this.masterHost, this.masterPort);
        this.masterComputable = computableMaster;
        this.masterUpdateable = cls;
        this.props = new Properties();
        this.props.load(new FileInputStream(ConfigFields.APP_CONFIG_FILE));
        this.appAttemptId = ConverterUtils.toContainerId(System.getenv("AM_CONTAINER_ID")).getApplicationAttemptId();
        this.appName = this.props.getProperty(ConfigFields.APP_NAME, ConfigFields.DEFAULT_APP_NAME).replace(' ', '_');
        this.batchSize = Integer.parseInt(this.props.getProperty(ConfigFields.APP_BATCH_SIZE, "200"));
        this.iterationCount = Integer.parseInt(this.props.getProperty("app.iteration.count", "1"));
        String property = this.props.getProperty(ConfigFields.INPUT_FORMAT_CLASS, ConfigFields.INPUT_FORMAT_CLASS_DEFAULT);
        LOG.debug("Using Input Format: " + property);
        LOG.info("IR:AppMaster > Using Input Format: " + property);
        Class<?> cls2 = null;
        try {
            cls2 = Class.forName(property);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        if (null == cls2) {
            this.inputFormatClass = TextInputFormat.class;
        } else if (InputFormat.class.isAssignableFrom(cls2)) {
            LOG.debug("good input format: " + property);
            this.inputFormatClass = cls2;
        } else {
            LOG.debug("bad input format: " + property + ", defaulting to TextInputFormat");
            this.inputFormatClass = TextInputFormat.class;
        }
        this.appConfig = new HashMap();
        for (Map.Entry entry : this.props.entrySet()) {
            this.appConfig.put((String) entry.getKey(), (String) entry.getValue());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Configuration entries: ");
            for (Map.Entry<CharSequence, CharSequence> entry2 : this.appConfig.entrySet()) {
                LOG.debug(((Object) entry2.getKey()) + "=" + ((Object) entry2.getValue()));
            }
            LOG.debug("Initialized application master, masterHost=" + this.masterHost + ", masterPort=" + this.masterPort + ", masterAddress=" + this.masterAddr + ", masterComputable=" + this.masterComputable.getClass().getName() + ", masterUpdateable=" + this.masterUpdateable.getClass().getName() + ", appAttemptId=" + this.appAttemptId);
        }
    }

    private Set<ApplicationMaster<T>.ConfigurationTuple> getConfigurationTuples() throws IOException {
        if (this.confTuples != null) {
            return this.confTuples;
        }
        Path path = new Path(this.props.getProperty("app.input.path"));
        FileSystem.get(this.conf).getFileStatus(path);
        HashSet hashSet = new HashSet();
        int i = 0;
        JobConf jobConf = new JobConf(new Configuration());
        jobConf.setInputFormat(this.inputFormatClass);
        FileInputFormat.setInputPaths(jobConf, new Path[]{path});
        for (FileSplit fileSplit : jobConf.getInputFormat().getSplits(jobConf, jobConf.getNumMapTasks())) {
            org.deeplearning4j.iterativereduce.runtime.yarn.avro.generated.FileSplit fileSplit2 = new org.deeplearning4j.iterativereduce.runtime.yarn.avro.generated.FileSplit();
            FileSplit fileSplit3 = fileSplit;
            if (fileSplit3.getLength() - fileSplit3.getStart() > 0) {
                fileSplit2.setLength(Long.valueOf(fileSplit3.getLength()));
                fileSplit2.setOffset(Long.valueOf(fileSplit3.getStart()));
                fileSplit2.setPath(fileSplit3.getPath().toString());
                StartupConfiguration m38build = StartupConfiguration.newBuilder().setBatchSize(this.batchSize).setIterations(this.iterationCount).setOther(this.appConfig).setSplit(fileSplit2).m38build();
                String str = "worker-" + i;
                hashSet.add(new ConfigurationTuple(fileSplit.getLocations()[0], str, m38build));
                i++;
                LOG.info("IR_AM_worker: " + str + " added split: " + fileSplit2.toString());
            } else {
                LOG.info("IR_AM: Culled out 0 length Split: " + fileSplit2.toString());
            }
        }
        LOG.info("Total Splits/Workers: " + hashSet.size());
        this.confTuples = hashSet;
        return hashSet;
    }

    private Map<WorkerId, StartupConfiguration> getMasterStartupConfiguration(Set<ApplicationMaster<T>.ConfigurationTuple> set) {
        HashMap hashMap = new HashMap();
        for (ApplicationMaster<T>.ConfigurationTuple configurationTuple : set) {
            hashMap.put(Utils.createWorkerId(configurationTuple.getWorkerId()), configurationTuple.getConfig());
        }
        return hashMap;
    }

    private Map<String, Integer> getNumberContainersHostMapping(Set<ApplicationMaster<T>.ConfigurationTuple> set) {
        HashMap hashMap = new HashMap();
        for (ApplicationMaster<T>.ConfigurationTuple configurationTuple : set) {
            Integer num = (Integer) hashMap.get(configurationTuple.getHost());
            if (num == null) {
                num = 0;
            }
            hashMap.put(configurationTuple.getHost(), Integer.valueOf(num.intValue() + 1));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created a host->numContainers mapping, with: ");
            for (Map.Entry entry : hashMap.entrySet()) {
                LOG.debug("host=" + ((String) entry.getKey()) + ", amount=" + entry.getValue());
            }
        }
        return hashMap;
    }

    private List<ResourceRequest> getRequestedContainersList(Set<ApplicationMaster<T>.ConfigurationTuple> set, ResourceManagerHandler resourceManagerHandler) throws YarnRemoteException {
        Map<String, Integer> numberContainersHostMapping = getNumberContainersHostMapping(set);
        ArrayList arrayList = new ArrayList();
        int parseInt = Integer.parseInt(this.props.getProperty(ConfigFields.YARN_MEMORY, "512"));
        resourceManagerHandler.getClientResourceManager();
        for (Map.Entry<String, Integer> entry : numberContainersHostMapping.entrySet()) {
            LOG.debug("Creating a resource request for host " + entry.getKey() + ", with " + entry.getValue() + " containers");
            arrayList.add(Utils.createResourceRequest("*", entry.getValue().intValue(), parseInt));
        }
        return arrayList;
    }

    private List<Thread> launchContainers(Set<ApplicationMaster<T>.ConfigurationTuple> set, List<Container> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<Container> it = list.iterator();
        while (it.hasNext()) {
            Container next = it.next();
            Iterator<ApplicationMaster<T>.ConfigurationTuple> it2 = set.iterator();
            LOG.debug("Looking to match up split for container on host " + next.getNodeId().getHost());
            while (true) {
                if (it2.hasNext()) {
                    ApplicationMaster<T>.ConfigurationTuple next2 = it2.next();
                    LOG.debug("Looking to match container host " + next.getNodeId().getHost() + ", with split host " + next2.getHost());
                    if (next2.getHost().equals(next.getNodeId().getHost())) {
                        LOG.debug("Found matching container for split");
                        Thread thread = new Thread(new LaunchContainerRunnabble(next2.getWorkerId(), next));
                        arrayList.add(thread);
                        thread.start();
                        it2.remove();
                        it.remove();
                        break;
                    }
                }
            }
        }
        if (list.size() > 0) {
            LOG.debug("Unable to find specific matches for some app splits, launching remainder");
            Iterator<Container> it3 = list.iterator();
            Iterator<ApplicationMaster<T>.ConfigurationTuple> it4 = set.iterator();
            while (it3.hasNext() && it4.hasNext()) {
                Container next3 = it3.next();
                ApplicationMaster<T>.ConfigurationTuple next4 = it4.next();
                LOG.debug("Launching split for host " + next4.getHost() + " on container host " + next3.getNodeId().getHost());
                Thread thread2 = new Thread(new LaunchContainerRunnabble(next4.getWorkerId(), next3));
                arrayList.add(thread2);
                thread2.start();
                it3.remove();
                it4.remove();
            }
        }
        return arrayList;
    }

    public int run(String[] strArr) throws Exception {
        this.conf = getConf();
        ResourceManagerHandler resourceManagerHandler = new ResourceManagerHandler(this.conf, this.appAttemptId);
        resourceManagerHandler.getAMResourceManager();
        try {
            resourceManagerHandler.registerApplicationMaster(this.masterHost, this.masterPort);
            try {
                Set<ApplicationMaster<T>.ConfigurationTuple> configurationTuples = getConfigurationTuples();
                Map<WorkerId, StartupConfiguration> masterStartupConfiguration = getMasterStartupConfiguration(configurationTuples);
                List<ResourceRequest> requestedContainersList = getRequestedContainersList(configurationTuples, resourceManagerHandler);
                ArrayList arrayList = new ArrayList();
                List<Container> arrayList2 = new ArrayList<>();
                try {
                    int size = configurationTuples.size();
                    int i = 0;
                    int parseInt = Integer.parseInt(this.props.getProperty(ConfigFields.APP_ALLOCATION_MAX_ATTEMPTS, "10"));
                    int i2 = 0;
                    while (i < size && i2 < parseInt) {
                        LOG.info("Requesting containers, got=" + i + ", needed=" + size + ", attempts=" + i2 + ", maxAttempts=" + parseInt);
                        List allocatedContainers = resourceManagerHandler.allocateRequest(requestedContainersList, arrayList).getAllocatedContainers();
                        i += allocatedContainers.size();
                        i2++;
                        arrayList2.addAll(allocatedContainers);
                        allocatedContainers.clear();
                        LOG.info("Got allocation response, allocatedContainers=" + allocatedContainers.size());
                        Thread.sleep(2500L);
                    }
                    int size2 = configurationTuples.size();
                    if (arrayList2.size() < size2) {
                        LOG.info("Unable to get required number of containers, will not continue, needed=" + size2 + ", allocated=" + arrayList2.size());
                        requestedContainersList.clear();
                        Iterator<Container> it = arrayList2.iterator();
                        while (it.hasNext()) {
                            arrayList.add(it.next().getId());
                        }
                        try {
                            resourceManagerHandler.allocateRequest(requestedContainersList, arrayList);
                        } catch (YarnRemoteException e) {
                            LOG.warn("Encountered an error while trying to release unwanted containers", e);
                        }
                        resourceManagerHandler.finishApplication("Unable to allocate containers, needed " + size2 + ", but got " + arrayList2.size(), FinalApplicationStatus.FAILED);
                        return ReturnCode.MASTER_ERROR.getCode();
                    }
                    LOG.info("Starting master service");
                    ApplicationMasterService applicationMasterService = new ApplicationMasterService(this.masterAddr, masterStartupConfiguration, this.masterComputable, this.masterUpdateable, this.appConfig, this.conf);
                    ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                    Future submit = newSingleThreadExecutor.submit(applicationMasterService);
                    LOG.info("Launching child containers");
                    List<Thread> launchContainers = launchContainers(configurationTuples, arrayList2);
                    requestedContainersList.clear();
                    AtomicInteger atomicInteger = new AtomicInteger();
                    AtomicInteger atomicInteger2 = new AtomicInteger();
                    LOG.info("Waiting for containers to complete...");
                    while (atomicInteger.get() < size2) {
                        try {
                            Thread.sleep(2000L);
                            try {
                                Iterator it2 = resourceManagerHandler.allocateRequest(requestedContainersList, arrayList).getCompletedContainersStatuses().iterator();
                                while (it2.hasNext()) {
                                    int exitStatus = ((ContainerStatus) it2.next()).getExitStatus();
                                    if (exitStatus != 0) {
                                        atomicInteger.incrementAndGet();
                                        atomicInteger2.incrementAndGet();
                                        applicationMasterService.fail();
                                        newSingleThreadExecutor.shutdown();
                                        LOG.info("At least one container failed with a non-zero exit code (" + exitStatus + "); killing application");
                                        resourceManagerHandler.finishApplication("Failing, due to at least container coming back with an non-zero exit code.", FinalApplicationStatus.KILLED);
                                        return -10;
                                    }
                                    atomicInteger.incrementAndGet();
                                }
                            } catch (YarnRemoteException e2) {
                                LOG.warn("Encountered an error while trying to heartbeat to resource manager", e2);
                            }
                        } catch (InterruptedException e3) {
                            LOG.warn("Interrupted while waiting on completed containers", e3);
                            return ReturnCode.MASTER_ERROR.getCode();
                        }
                    }
                    LOG.info("Containers completed");
                    Iterator<Thread> it3 = launchContainers.iterator();
                    while (it3.hasNext()) {
                        try {
                            it3.next().join(1000L);
                        } catch (InterruptedException e4) {
                            LOG.warn("Interrupted while waiting for Launcher threads to complete", e4);
                        }
                    }
                    if (!submit.isDone()) {
                        applicationMasterService.fail();
                    }
                    int intValue = ((Integer) submit.get()).intValue();
                    LOG.info("Master service completed with exitCode=" + intValue);
                    newSingleThreadExecutor.shutdown();
                    if (intValue == 0) {
                        UserGroupInformation.createRemoteUser(System.getenv("USER")).doAs(new PrivilegedExceptionAction<Void>() { // from class: org.deeplearning4j.iterativereduce.runtime.yarn.appmaster.ApplicationMaster.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.security.PrivilegedExceptionAction
                            public Void run() {
                                Path path = new Path(ApplicationMaster.this.props.getProperty("app.output.path"));
                                try {
                                    DataOutputStream create = path.getFileSystem(ApplicationMaster.this.conf).create(path);
                                    ApplicationMaster.LOG.info("Writing master results to " + path.toString());
                                    ApplicationMaster.this.masterComputable.complete(create);
                                    create.flush();
                                    create.close();
                                    return null;
                                } catch (IOException e5) {
                                    e5.printStackTrace();
                                    return null;
                                }
                            }
                        });
                    } else {
                        LOG.warn("Not writing master results, as the master came back with errors!");
                    }
                    ReturnCode returnCode = atomicInteger2.get() == 0 ? ReturnCode.OK : ReturnCode.CONTAINER_ERROR;
                    try {
                        if (atomicInteger2.get() == 0) {
                            resourceManagerHandler.finishApplication("Completed successfully", FinalApplicationStatus.SUCCEEDED);
                        } else {
                            resourceManagerHandler.finishApplication("Completed with " + atomicInteger2.get() + " failed containers", FinalApplicationStatus.FAILED);
                        }
                    } catch (YarnRemoteException e5) {
                        LOG.warn("Encountered an error while trying to send final status to resource manager", e5);
                    }
                    return returnCode.getCode();
                } catch (YarnRemoteException e6) {
                    LOG.error("Encountered an error while trying to allocate containers", e6);
                    return ReturnCode.MASTER_ERROR.getCode();
                }
            } catch (IOException e7) {
                LOG.error("Error encountered while trying to generate configurations", e7);
                return ReturnCode.MASTER_ERROR.getCode();
            }
        } catch (YarnRemoteException e8) {
            LOG.error("Error encountered while trying to register application master", e8);
            return ReturnCode.MASTER_ERROR.getCode();
        }
    }
}
