package edu.iu.dsc.tws.executor.threading;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.ExecutorContext;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.executor.IExecutionHook;
import edu.iu.dsc.tws.api.compute.executor.IExecutor;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.executor.core.ExecutionRuntime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingAllSharingExecutor.class */
public class StreamingAllSharingExecutor implements IExecutor {
    private static final Logger LOG = Logger.getLogger(StreamingSharingExecutor.class.getName());
    private int numThreads;
    private ExecutorService threads;
    private TWSChannel channel;
    private Config config;
    private int workerId;
    protected boolean notStopped = true;
    private boolean cleanUpCalled = false;
    private CountDownLatch doneSignal;
    private ExecutionPlan plan;
    protected IExecutionHook executionHook;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingAllSharingExecutor$CommunicationWorker.class */
    public class CommunicationWorker implements Runnable {
        private BlockingQueue<INodeInstance> tasks;

        public CommunicationWorker(BlockingQueue<INodeInstance> blockingQueue) {
            this.tasks = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (StreamingAllSharingExecutor.this.isNotStopped()) {
                runChannelComplete();
            }
            StreamingAllSharingExecutor.this.doneSignal.countDown();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void runChannelComplete() {
            try {
                INodeInstance poll = this.tasks.poll();
                if (poll != null && !poll.isComplete()) {
                    this.tasks.offer(poll);
                }
            } catch (Throwable th) {
                StreamingAllSharingExecutor.LOG.log(Level.SEVERE, String.format("%d Error in executor", Integer.valueOf(StreamingAllSharingExecutor.this.workerId)), th);
                throw new RuntimeException("Error occurred in execution of task", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingAllSharingExecutor$StreamExecution.class */
    public class StreamExecution implements IExecution {
        private Map<Integer, INodeInstance> nodeMap;
        private ExecutionPlan executionPlan;
        private boolean taskExecution = true;
        private StreamWorker mainWorker;
        private CommunicationWorker worker;

        StreamExecution(ExecutionPlan executionPlan, Map<Integer, INodeInstance> map, StreamWorker streamWorker) {
            this.nodeMap = map;
            this.executionPlan = executionPlan;
            this.mainWorker = streamWorker;
        }

        public boolean waitForCompletion() {
            while (StreamingAllSharingExecutor.this.isNotStopped()) {
                StreamingAllSharingExecutor.this.channel.progress();
                this.mainWorker.runExecution();
            }
            StreamingAllSharingExecutor.this.cleanUp(this.nodeMap);
            StreamingAllSharingExecutor.this.closeExecution();
            return true;
        }

        public boolean progress() {
            if (this.taskExecution) {
                if (StreamingAllSharingExecutor.this.isNotStopped()) {
                    StreamingAllSharingExecutor.this.channel.progress();
                    this.mainWorker.runExecution();
                    return true;
                }
                StreamingAllSharingExecutor.this.cleanUp(this.nodeMap);
                StreamingAllSharingExecutor.this.cleanUpCalled = false;
                this.worker = StreamingAllSharingExecutor.this.scheduleWaitFor(this.nodeMap)[0];
                this.taskExecution = false;
            }
            StreamingAllSharingExecutor.this.channel.progress();
            this.worker.runChannelComplete();
            return false;
        }

        public void close() {
            if (StreamingAllSharingExecutor.this.isNotStopped()) {
                throw new RuntimeException("We need to stop the execution before close");
            }
            if (StreamingAllSharingExecutor.this.cleanUpCalled) {
                throw new RuntimeException("Close is called on a already closed execution");
            }
            StreamingAllSharingExecutor.this.close(this.executionPlan, this.nodeMap);
            StreamingAllSharingExecutor.this.executionHook.onClose(StreamingAllSharingExecutor.this);
            StreamingAllSharingExecutor.this.cleanUpCalled = true;
        }

        public void stop() {
            StreamingAllSharingExecutor.this.notStopped = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingAllSharingExecutor$StreamWorker.class */
    public class StreamWorker implements Runnable {
        private List<INodeInstance> tasks;
        private AtomicBoolean[] ignoreIndex;
        private AtomicBoolean[] idleTasks;
        private int lastIndex;
        private AtomicInteger activeCounter;

        public StreamWorker(List<INodeInstance> list, AtomicBoolean[] atomicBooleanArr, AtomicBoolean[] atomicBooleanArr2, AtomicInteger atomicInteger) {
            this.tasks = list;
            this.ignoreIndex = atomicBooleanArr;
            this.idleTasks = atomicBooleanArr2;
            this.activeCounter = atomicInteger;
        }

        private int getNext() {
            if (this.lastIndex == this.tasks.size()) {
                this.lastIndex = 0;
            }
            if (!this.ignoreIndex[this.lastIndex].compareAndSet(false, true)) {
                this.lastIndex++;
                return -1;
            }
            int i = this.lastIndex;
            this.lastIndex = i + 1;
            return i;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (StreamingAllSharingExecutor.this.isNotStopped()) {
                runExecution();
            }
            StreamingAllSharingExecutor.this.doneSignal.countDown();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void runExecution() {
            try {
                int next = getNext();
                if (next != -1) {
                    if (this.tasks.get(next).execute()) {
                        if (this.idleTasks[next].compareAndSet(true, false)) {
                            this.activeCounter.getAndIncrement();
                        }
                    } else if (this.idleTasks[next].compareAndSet(false, true)) {
                        if (this.activeCounter.decrementAndGet() == 0) {
                            LockSupport.parkNanos(1L);
                        }
                    } else if (this.activeCounter.get() == 0) {
                        LockSupport.parkNanos(1L);
                    }
                    this.ignoreIndex[next].set(false);
                }
            } catch (Throwable th) {
                StreamingAllSharingExecutor.LOG.log(Level.SEVERE, String.format("%d Error in executor", Integer.valueOf(StreamingAllSharingExecutor.this.workerId)), th);
                throw new RuntimeException("Error occurred in execution of task", th);
            }
        }
    }

    public StreamingAllSharingExecutor(Config config, int i, TWSChannel tWSChannel, ExecutionPlan executionPlan, IExecutionHook iExecutionHook) {
        this.workerId = i;
        this.config = config;
        this.channel = tWSChannel;
        this.numThreads = ExecutorContext.threadsPerContainer(this.config);
        if (this.numThreads > 1) {
            this.threads = Executors.newFixedThreadPool(this.numThreads - 1, new ThreadFactoryBuilder().setNameFormat("executor-%d").setDaemon(true).build());
        }
        this.plan = executionPlan;
        this.executionHook = iExecutionHook;
    }

    public boolean execute() {
        this.executionHook.beforeExecution();
        this.config = Config.newBuilder().putAll(this.config).put("_twister2.runtime_", new ExecutionRuntime(ExecutorContext.jobName(this.config), this.plan, this.channel)).build();
        return runExecution();
    }

    public boolean execute(boolean z) {
        return execute();
    }

    public ExecutionPlan getExecutionPlan() {
        return this.plan;
    }

    public IExecution iExecute() {
        this.executionHook.beforeExecution();
        this.config = Config.newBuilder().putAll(this.config).put("_twister2.runtime_", new ExecutionRuntime(ExecutorContext.jobName(this.config), this.plan, this.channel)).build();
        return runIExecution();
    }

    public void close() {
        if (this.threads != null) {
            this.threads.shutdown();
        }
    }

    public boolean isNotStopped() {
        return this.notStopped;
    }

    public boolean runExecution() {
        Map<Integer, INodeInstance> nodes = this.plan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return true;
        }
        StreamWorker streamWorker = scheduleExecution(nodes)[0];
        while (isNotStopped()) {
            this.channel.progress();
            streamWorker.runExecution();
        }
        cleanUp(nodes);
        return true;
    }

    private StreamWorker[] scheduleExecution(Map<Integer, INodeInstance> map) {
        ArrayList arrayList = new ArrayList(map.values());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((INodeInstance) it.next()).prepare(this.config);
        }
        StreamWorker[] streamWorkerArr = new StreamWorker[this.numThreads];
        AtomicBoolean[] atomicBooleanArr = new AtomicBoolean[arrayList.size()];
        AtomicBoolean[] atomicBooleanArr2 = new AtomicBoolean[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            atomicBooleanArr[i] = new AtomicBoolean(false);
            atomicBooleanArr2[i] = new AtomicBoolean(false);
        }
        this.doneSignal = new CountDownLatch(this.numThreads - 1);
        AtomicInteger atomicInteger = new AtomicInteger(arrayList.size());
        streamWorkerArr[0] = new StreamWorker(arrayList, atomicBooleanArr, atomicBooleanArr2, atomicInteger);
        for (int i2 = 1; i2 < this.numThreads; i2++) {
            StreamWorker streamWorker = new StreamWorker(arrayList, atomicBooleanArr, atomicBooleanArr2, atomicInteger);
            this.threads.submit(streamWorker);
            streamWorkerArr[i2] = streamWorker;
        }
        return streamWorkerArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanUp(Map<Integer, INodeInstance> map) {
        try {
            this.doneSignal.await();
            this.cleanUpCalled = true;
            this.executionHook.afterExecution();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    public IExecution runIExecution() {
        Map<Integer, INodeInstance> nodes = this.plan.getNodes();
        if (nodes.size() != 0) {
            return new StreamExecution(this.plan, nodes, scheduleExecution(nodes)[0]);
        }
        LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
        return new NullExecutor();
    }

    public boolean closeExecution() {
        Map<Integer, INodeInstance> nodes = this.plan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return true;
        }
        CommunicationWorker communicationWorker = scheduleWaitFor(nodes)[0];
        while (isNotStopped()) {
            this.channel.progress();
            communicationWorker.runChannelComplete();
        }
        close(this.plan, nodes);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> map) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(map.size() * 2);
        arrayBlockingQueue.addAll(map.values());
        CommunicationWorker[] communicationWorkerArr = new CommunicationWorker[this.numThreads];
        communicationWorkerArr[0] = new CommunicationWorker(arrayBlockingQueue);
        this.doneSignal = new CountDownLatch(this.numThreads - 1);
        for (int i = 1; i < this.numThreads; i++) {
            communicationWorkerArr[i] = new CommunicationWorker(arrayBlockingQueue);
            this.threads.submit(communicationWorkerArr[i]);
        }
        return communicationWorkerArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close(ExecutionPlan executionPlan, Map<Integer, INodeInstance> map) {
        try {
            this.doneSignal.await();
            List<IParallelOperation> parallelOperations = executionPlan.getParallelOperations();
            resetNodes(map, parallelOperations);
            Iterator<INodeInstance> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator<IParallelOperation> it2 = parallelOperations.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            this.executionHook.onClose(this);
            this.cleanUpCalled = true;
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    private void resetNodes(Map<Integer, INodeInstance> map, List<IParallelOperation> list) {
        Iterator<INodeInstance> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().reset();
        }
        Iterator<IParallelOperation> it2 = list.iterator();
        while (it2.hasNext()) {
            it2.next().reset();
        }
    }
}
