package edu.iu.dsc.tws.executor.threading;

import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.ExecutionState;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.executor.IExecutionHook;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.config.Config;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/threading/BatchSharingExecutor.class */
public class BatchSharingExecutor extends ThreadSharingExecutor {
    private static final Logger LOG = Logger.getLogger(BatchSharingExecutor.class.getName());
    private AtomicInteger finishedInstances;
    private int workerId;
    protected boolean notStopped;
    private boolean cleanUpCalled;
    private CountDownLatch doneSignal;

    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/BatchSharingExecutor$BatchExecution.class */
    private class BatchExecution implements IExecution {
        private Map<Integer, INodeInstance> nodeMap;
        private ExecutionPlan executionPlan;
        private BlockingQueue<INodeInstance> tasks;
        private boolean taskExecution = true;

        BatchExecution(ExecutionPlan executionPlan, Map<Integer, INodeInstance> map) {
            this.nodeMap = map;
            this.executionPlan = executionPlan;
            this.tasks = new ArrayBlockingQueue(map.size() * 2);
            this.tasks.addAll(map.values());
        }

        public boolean waitForCompletion() {
            while (BatchSharingExecutor.this.isNotStopped() && BatchSharingExecutor.this.finishedInstances.get() != this.nodeMap.size()) {
                BatchSharingExecutor.this.channel.progress();
            }
            this.executionPlan.setExecutionState(ExecutionState.EXECUTED);
            BatchSharingExecutor.this.cleanUp(this.nodeMap);
            BatchSharingExecutor.this.closeExecution();
            return true;
        }

        public boolean progress() {
            if (this.taskExecution) {
                if (BatchSharingExecutor.this.finishedInstances.get() != this.nodeMap.size()) {
                    BatchSharingExecutor.this.channel.progress();
                    return true;
                }
                this.executionPlan.setExecutionState(ExecutionState.EXECUTED);
                BatchSharingExecutor.this.cleanUp(this.nodeMap);
                BatchSharingExecutor.this.cleanUpCalled = false;
                BatchSharingExecutor.this.scheduleWaitFor(this.nodeMap);
                this.taskExecution = false;
            }
            if (!BatchSharingExecutor.this.isNotStopped() || BatchSharingExecutor.this.finishedInstances.get() == this.nodeMap.size()) {
                return false;
            }
            BatchSharingExecutor.this.channel.progress();
            return true;
        }

        public void close() {
            if (BatchSharingExecutor.this.isNotStopped()) {
                throw new RuntimeException("We need to stop the execution before close");
            }
            if (BatchSharingExecutor.this.cleanUpCalled) {
                throw new RuntimeException("Close is called on a already closed execution");
            }
            BatchSharingExecutor.this.close(this.executionPlan, this.nodeMap);
            BatchSharingExecutor.this.executionHook.onClose(BatchSharingExecutor.this);
            BatchSharingExecutor.this.cleanUpCalled = true;
        }

        public void stop() {
            BatchSharingExecutor.this.notStopped = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/BatchSharingExecutor$BatchWorker.class */
    public class BatchWorker implements Runnable {
        private List<INodeInstance> tasks;
        private AtomicBoolean[] ignoreIndex;
        private int lastIndex;
        private boolean bindTaskToThread;
        private INodeInstance task;

        public BatchWorker(List<INodeInstance> list, AtomicBoolean[] atomicBooleanArr) {
            this.tasks = list;
            this.ignoreIndex = atomicBooleanArr;
        }

        public BatchWorker(INodeInstance iNodeInstance) {
            this.bindTaskToThread = true;
            this.task = iNodeInstance;
        }

        private int getNext() {
            if (this.lastIndex == this.tasks.size()) {
                this.lastIndex = 0;
            }
            if (!this.ignoreIndex[this.lastIndex].compareAndSet(false, true)) {
                this.lastIndex++;
                return -1;
            }
            int i = this.lastIndex;
            this.lastIndex = i + 1;
            return i;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.bindTaskToThread) {
                while (BatchSharingExecutor.this.isNotStopped() && BatchSharingExecutor.this.finishedInstances.get() != this.tasks.size()) {
                    try {
                        int next = getNext();
                        if (next != -1) {
                            if (this.tasks.get(next).execute()) {
                                this.ignoreIndex[next].set(false);
                            } else {
                                BatchSharingExecutor.this.finishedInstances.incrementAndGet();
                            }
                        }
                    } catch (Throwable th) {
                        BatchSharingExecutor.LOG.log(Level.SEVERE, String.format("%d Error in executor", Integer.valueOf(BatchSharingExecutor.this.workerId)), th);
                        throw new RuntimeException("Error occurred in execution of task", th);
                    }
                }
                BatchSharingExecutor.this.doneSignal.countDown();
            }
            while (true) {
                if (!BatchSharingExecutor.this.isNotStopped()) {
                    break;
                } else if (!this.task.execute()) {
                    BatchSharingExecutor.this.finishedInstances.incrementAndGet();
                    break;
                }
            }
            BatchSharingExecutor.this.doneSignal.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/BatchSharingExecutor$CommunicationWorker.class */
    public class CommunicationWorker implements Runnable {
        private BlockingQueue<INodeInstance> tasks;

        public CommunicationWorker(BlockingQueue<INodeInstance> blockingQueue) {
            this.tasks = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (BatchSharingExecutor.this.isNotStopped()) {
                try {
                    INodeInstance poll = this.tasks.poll();
                    if (poll == null) {
                        break;
                    } else if (poll.isComplete()) {
                        BatchSharingExecutor.this.finishedInstances.incrementAndGet();
                    } else {
                        this.tasks.offer(poll);
                    }
                } catch (Throwable th) {
                    BatchSharingExecutor.LOG.log(Level.SEVERE, String.format("%d Error in executor", Integer.valueOf(BatchSharingExecutor.this.workerId)), th);
                    throw new RuntimeException("Error occurred in execution of task", th);
                }
            }
            BatchSharingExecutor.this.doneSignal.countDown();
        }
    }

    public BatchSharingExecutor(Config config, int i, TWSChannel tWSChannel, ExecutionPlan executionPlan, IExecutionHook iExecutionHook) {
        super(config, tWSChannel, executionPlan, iExecutionHook);
        this.finishedInstances = new AtomicInteger(0);
        this.notStopped = true;
        this.cleanUpCalled = false;
        this.workerId = i;
    }

    @Override // edu.iu.dsc.tws.executor.threading.ThreadSharingExecutor
    public boolean runExecution() {
        Map<Integer, INodeInstance> nodes = this.executionPlan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return true;
        }
        if (this.executionPlan.getExecutionState() == ExecutionState.EXECUTED) {
            resetNodes(this.executionPlan.getNodes(), this.executionPlan.getParallelOperations());
        }
        scheduleExecution(nodes);
        while (this.finishedInstances.get() != nodes.size()) {
            this.channel.progress();
        }
        cleanUp(nodes);
        return true;
    }

    public boolean execute(boolean z) {
        boolean execute = execute();
        if (z) {
            closeExecution();
        }
        return execute;
    }

    public boolean isNotStopped() {
        return this.notStopped;
    }

    private void scheduleExecution(Map<Integer, INodeInstance> map) {
        ArrayList arrayList = new ArrayList(map.values());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((INodeInstance) it.next()).prepare(this.config);
        }
        if (this.numThreads >= arrayList.size()) {
            this.doneSignal = new CountDownLatch(arrayList.size());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                this.threads.submit(new BatchWorker((INodeInstance) it2.next()));
            }
            return;
        }
        AtomicBoolean[] atomicBooleanArr = new AtomicBoolean[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            atomicBooleanArr[i] = new AtomicBoolean(false);
        }
        this.doneSignal = new CountDownLatch(this.numThreads);
        for (int i2 = 0; i2 < this.numThreads; i2++) {
            this.threads.submit(new BatchWorker(arrayList, atomicBooleanArr));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanUp(Map<Integer, INodeInstance> map) {
        try {
            this.doneSignal.await();
            this.executionPlan.setExecutionState(ExecutionState.EXECUTED);
            this.finishedInstances.set(0);
            this.cleanUpCalled = true;
            this.executionHook.afterExecution();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    @Override // edu.iu.dsc.tws.executor.threading.ThreadSharingExecutor
    public IExecution runIExecution() {
        Map<Integer, INodeInstance> nodes = this.executionPlan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return new NullExecutor();
        }
        if (this.executionPlan.getExecutionState() == ExecutionState.EXECUTED) {
            resetNodes(this.executionPlan.getNodes(), this.executionPlan.getParallelOperations());
        }
        scheduleExecution(nodes);
        return new BatchExecution(this.executionPlan, nodes);
    }

    public boolean closeExecution() {
        Map<Integer, INodeInstance> nodes = this.executionPlan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return true;
        }
        scheduleWaitFor(nodes);
        while (isNotStopped() && this.finishedInstances.get() != nodes.size()) {
            this.channel.progress();
        }
        close(this.executionPlan, nodes);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleWaitFor(Map<Integer, INodeInstance> map) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(map.size() * 2);
        arrayBlockingQueue.addAll(map.values());
        int size = arrayBlockingQueue.size();
        CommunicationWorker[] communicationWorkerArr = new CommunicationWorker[size];
        this.doneSignal = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            communicationWorkerArr[i] = new CommunicationWorker(arrayBlockingQueue);
            this.threads.submit(communicationWorkerArr[i]);
        }
    }

    private void resetNodes(Map<Integer, INodeInstance> map, List<IParallelOperation> list) {
        Iterator<INodeInstance> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().reset();
        }
        Iterator<IParallelOperation> it2 = list.iterator();
        while (it2.hasNext()) {
            it2.next().reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close(ExecutionPlan executionPlan, Map<Integer, INodeInstance> map) {
        try {
            this.doneSignal.await();
            List<IParallelOperation> parallelOperations = executionPlan.getParallelOperations();
            resetNodes(map, parallelOperations);
            Iterator<INodeInstance> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator<IParallelOperation> it2 = parallelOperations.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            this.executionHook.onClose(this);
            this.finishedInstances.set(0);
            this.cleanUpCalled = true;
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }
}
