package edu.iu.dsc.tws.examples.internal.batchscheduler;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.IONames;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.BaseCompute;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.resource.Twister2Worker;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner;
import edu.iu.dsc.tws.data.fs.io.InputSplit;
import edu.iu.dsc.tws.dataset.DataSource;
import edu.iu.dsc.tws.dataset.partition.EntityPartition;
import edu.iu.dsc.tws.examples.Utils;
import edu.iu.dsc.tws.examples.batch.cdfw.CDFConstants;
import edu.iu.dsc.tws.examples.ml.svm.constant.Constants;
import edu.iu.dsc.tws.executor.core.ExecutionRuntime;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import mpi.MPI;
import mpi.MPIException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

/* loaded from: input_file:edu/iu/dsc/tws/examples/internal/batchscheduler/ConstraintTaskExample.class */
public class ConstraintTaskExample implements Twister2Worker {
    private static final Logger LOG = Logger.getLogger(ConstraintTaskExample.class.getName());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/batchscheduler/ConstraintTaskExample$FirstSinkTask.class */
    public static class FirstSinkTask extends BaseCompute implements Collector {
        private static final long serialVersionUID = -5190777711234234L;
        private int length;
        private String inputKey;
        private double[][] dataPointsLocal;

        FirstSinkTask(int i, String str) {
            this.length = i;
            this.inputKey = str;
        }

        public boolean execute(IMessage iMessage) {
            ArrayList arrayList = new ArrayList();
            while (((Iterator) iMessage.getContent()).hasNext()) {
                arrayList.add(String.valueOf(((Iterator) iMessage.getContent()).next()));
            }
            this.dataPointsLocal = new double[arrayList.size()][this.length];
            for (int i = 0; i < arrayList.size(); i++) {
                String[] split = ((String) arrayList.get(i)).split(Constants.SimpleGraphConfig.DELIMITER);
                for (int i2 = 0; i2 < this.length; i2++) {
                    this.dataPointsLocal[i][i2] = Double.parseDouble(split[i2].trim());
                }
            }
            return true;
        }

        public DataPartition<double[][]> get(String str) {
            if (str.equals(this.inputKey)) {
                return new EntityPartition(this.dataPointsLocal);
            }
            throw new RuntimeException("Requesting an unrelated partition " + str);
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
        }

        public IONames getCollectibleNames() {
            return IONames.declare(new String[]{this.inputKey});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/batchscheduler/ConstraintTaskExample$FirstSourceTask.class */
    public static class FirstSourceTask extends BaseSource {
        private static final long serialVersionUID = -254264120110286748L;
        private DataSource<?, ?> source;
        private String dataDirectory;
        private int dataSize;

        FirstSourceTask(String str, int i) {
            setDataDirectory(str);
            setDataSize(i);
        }

        public String getDataDirectory() {
            return this.dataDirectory;
        }

        public void setDataDirectory(String str) {
            this.dataDirectory = str;
        }

        public int getDataSize() {
            return this.dataSize;
        }

        public void setDataSize(int i) {
            this.dataSize = i;
        }

        public void execute() {
            InputSplit nextSplit = this.source.getNextSplit(this.context.taskIndex());
            while (nextSplit != null) {
                while (!nextSplit.reachedEnd()) {
                    try {
                        Object nextRecord = nextSplit.nextRecord((Object) null);
                        if (nextRecord != null) {
                            this.context.write("direct", nextRecord);
                        }
                    } catch (IOException e) {
                        ConstraintTaskExample.LOG.log(Level.SEVERE, "Failed to read the input", (Throwable) e);
                    }
                }
                nextSplit = this.source.getNextSplit(this.context.taskIndex());
            }
            this.context.end("direct");
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.source = ((ExecutionRuntime) config.get("_twister2.runtime_")).createInput(config, taskContext, new LocalTextInputPartitioner(new Path(getDataDirectory()), taskContext.getParallelism(), config));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/batchscheduler/ConstraintTaskExample$SecondSinkTask.class */
    public static class SecondSinkTask extends BaseCompute {
        private static final long serialVersionUID = -254264120110286748L;
        private static int worldRank = 0;
        private static int worldSize = 0;
        private int length;
        private double[][] dataPointsLocal;

        SecondSinkTask(int i) {
            this.length = i;
        }

        public boolean execute(IMessage iMessage) {
            ConstraintTaskExample.LOG.info("Received message:" + iMessage.getContent().toString());
            ArrayList arrayList = new ArrayList();
            while (((Iterator) iMessage.getContent()).hasNext()) {
                arrayList.add(String.valueOf(((Iterator) iMessage.getContent()).next()));
            }
            this.dataPointsLocal = new double[arrayList.size()][this.length];
            for (int i = 0; i < arrayList.size(); i++) {
                String[] split = ((String) arrayList.get(i)).split(Constants.SimpleGraphConfig.DELIMITER);
                for (int i2 = 0; i2 < this.length; i2++) {
                    this.dataPointsLocal[i][i2] = Double.parseDouble(split[i2].trim());
                }
            }
            try {
                worldRank = MPI.COMM_WORLD.getRank();
                worldSize = MPI.COMM_WORLD.getSize();
                int[] iArr = {1, 2, 3, 4, 5, 6, 7, 8};
                MPI.COMM_WORLD.reduce(iArr, new int[iArr.length], iArr.length, MPI.INT, MPI.SUM, 0);
                return true;
            } catch (MPIException e) {
                e.printStackTrace();
                return true;
            }
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(this.config, taskContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/batchscheduler/ConstraintTaskExample$SecondSourceTask.class */
    public static class SecondSourceTask extends BaseSource implements Receptor {
        private static final long serialVersionUID = -254264120110286748L;
        private DataPartition<?> dataPointsPartition = null;
        private String inputKey;

        SecondSourceTask(String str) {
            this.inputKey = str;
        }

        public void execute() {
            double[][] dArr = (double[][]) this.dataPointsPartition.getConsumer().next();
            ConstraintTaskExample.LOG.info("Context Task Index:" + this.context.taskIndex() + "\t" + dArr.length);
            this.context.writeEnd("direct", dArr);
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(this.config, taskContext);
        }

        public void add(String str, DataPartition<?> dataPartition) {
            ConstraintTaskExample.LOG.log(Level.INFO, "Received input: " + str);
            if (this.inputKey.equals(str)) {
                this.dataPointsPartition = dataPartition;
            }
        }

        public IONames getReceivableNames() {
            return IONames.declare(new String[]{this.inputKey});
        }
    }

    public static void main(String[] strArr) throws ParseException {
        LOG.log(Level.INFO, "Constraint Task Graph Example");
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.exector.worker.threads", 1);
        Options options = new Options();
        options.addOption("workers", true, "Workers");
        options.addOption(CDFConstants.ARGS_PARALLELISM_VALUE, true, CDFConstants.ARGS_PARALLELISM_VALUE);
        options.addOption(CDFConstants.ARGS_DSIZE, true, CDFConstants.ARGS_DSIZE);
        options.addOption("dim", true, "dim");
        options.addOption(Utils.createOption(CDFConstants.ARGS_DINPUT, true, "Data points Input directory", true));
        CommandLine parse = new DefaultParser().parse(options, strArr);
        int parseInt = Integer.parseInt(parse.getOptionValue("workers"));
        int parseInt2 = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_PARALLELISM_VALUE));
        int parseInt3 = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_DSIZE));
        int parseInt4 = Integer.parseInt(parse.getOptionValue("dim"));
        String optionValue = parse.getOptionValue(CDFConstants.ARGS_DINPUT);
        hashMap.put("workers", Integer.toString(parseInt));
        hashMap.put(CDFConstants.ARGS_DSIZE, Integer.toString(parseInt3));
        hashMap.put("dim", Integer.toString(parseInt4));
        hashMap.put(CDFConstants.ARGS_PARALLELISM_VALUE, Integer.toString(parseInt2));
        hashMap.put(CDFConstants.ARGS_DINPUT, optionValue);
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("Constraint-Example");
        newBuilder.setWorkerClass(ConstraintTaskExample.class.getName());
        newBuilder.addComputeResource(2.0d, 2048, 1.0d, parseInt);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }

    public void execute(WorkerEnvironment workerEnvironment) {
        int workerId = workerEnvironment.getWorkerId();
        Config config = workerEnvironment.getConfig();
        long currentTimeMillis = System.currentTimeMillis();
        LOG.log(Level.INFO, "Task worker starting: " + workerId);
        TaskExecutor taskExecutor = ComputeEnvironment.init(workerEnvironment).getTaskExecutor();
        String valueOf = String.valueOf(config.get(CDFConstants.ARGS_DINPUT));
        int parseInt = Integer.parseInt(String.valueOf(config.get("dim")));
        int parseInt2 = Integer.parseInt(String.valueOf(config.get(CDFConstants.ARGS_PARALLELISM_VALUE)));
        int parseInt3 = Integer.parseInt(String.valueOf(config.get(CDFConstants.ARGS_DSIZE)));
        new DataGenerator(config, workerId).generate(new Path(valueOf), parseInt3, parseInt);
        ComputeGraph buildFirstGraph = buildFirstGraph(parseInt2, config, valueOf, parseInt3, parseInt, "firstgraphpoints", "1");
        ComputeGraph buildSecondGraph = buildSecondGraph(parseInt2, config, parseInt, "firstgraphpoints", "1");
        taskExecutor.execute(buildFirstGraph, taskExecutor.plan(buildFirstGraph));
        DataObject output = taskExecutor.getOutput("firstsink");
        ExecutionPlan plan = taskExecutor.plan(buildSecondGraph);
        taskExecutor.addInput("firstgraphpoints", output);
        taskExecutor.execute(buildSecondGraph, plan);
        LOG.info("Total Execution Time: " + (System.currentTimeMillis() - currentTimeMillis));
    }

    private ComputeGraph buildFirstGraph(int i, Config config, String str, int i2, int i3, String str2, String str3) {
        FirstSourceTask firstSourceTask = new FirstSourceTask(str, i2);
        FirstSinkTask firstSinkTask = new FirstSinkTask(i3, str2);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("firstsource", firstSourceTask, i);
        newBuilder.addCompute("firstsink", firstSinkTask, i).direct("firstsource").viaEdge("direct").withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        newBuilder.setTaskGraphName("firstTG");
        newBuilder.addGraphConstraints("twister2.max.task.instances.per.worker", str3);
        return newBuilder.build();
    }

    private ComputeGraph buildSecondGraph(int i, Config config, int i2, String str, String str2) {
        SecondSourceTask secondSourceTask = new SecondSourceTask(str);
        SecondSinkTask secondSinkTask = new SecondSinkTask(i2);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("secondsource", secondSourceTask, i);
        newBuilder.addCompute("secondsink", secondSinkTask, i).direct("secondsource").viaEdge("direct").withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        newBuilder.setTaskGraphName("secondTG");
        newBuilder.addGraphConstraints("twister2.max.task.instances.per.worker", str2);
        return newBuilder.build();
    }
}
