package edu.iu.dsc.tws.executor.threading;

import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.executor.IExecutionHook;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.config.Config;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingSharingExecutor.class */
public class StreamingSharingExecutor extends ThreadSharingExecutor {
    private static final Logger LOG = Logger.getLogger(StreamingSharingExecutor.class.getName());
    private int workerId;
    protected boolean notStopped;
    private boolean cleanUpCalled;
    private CountDownLatch doneSignal;

    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingSharingExecutor$StreamExecution.class */
    private class StreamExecution implements IExecution {
        private Map<Integer, INodeInstance> nodeMap;
        private ExecutionPlan executionPlan;

        StreamExecution(ExecutionPlan executionPlan, Map<Integer, INodeInstance> map) {
            this.nodeMap = map;
            this.executionPlan = executionPlan;
        }

        public boolean waitForCompletion() {
            while (StreamingSharingExecutor.this.isNotStopped()) {
                StreamingSharingExecutor.this.channel.progress();
            }
            StreamingSharingExecutor.this.cleanUp(this.nodeMap);
            return true;
        }

        public boolean progress() {
            if (!StreamingSharingExecutor.this.isNotStopped()) {
                return false;
            }
            StreamingSharingExecutor.this.channel.progress();
            return true;
        }

        public void close() {
            if (StreamingSharingExecutor.this.isNotStopped()) {
                throw new RuntimeException("We need to stop the execution before close");
            }
            if (StreamingSharingExecutor.this.cleanUpCalled) {
                throw new RuntimeException("Close is called on a already closed execution");
            }
            StreamingSharingExecutor.this.cleanUp(this.nodeMap);
            StreamingSharingExecutor.this.executionHook.onClose(StreamingSharingExecutor.this);
            StreamingSharingExecutor.this.cleanUpCalled = true;
        }

        public void stop() {
            StreamingSharingExecutor.this.notStopped = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:edu/iu/dsc/tws/executor/threading/StreamingSharingExecutor$StreamWorker.class */
    public class StreamWorker implements Runnable {
        private BlockingQueue<INodeInstance> tasks;

        public StreamWorker(BlockingQueue<INodeInstance> blockingQueue) {
            this.tasks = blockingQueue;
        }

        /* JADX WARN: Code restructure failed: missing block: B:8:0x0030, code lost:
        
            edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.LOG.log(java.util.logging.Level.INFO, "Thread existing as more threads than tasks are been assigned");
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                r8 = this;
            L0:
                r0 = r8
                edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor r0 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.this
                boolean r0 = r0.isNotStopped()
                if (r0 == 0) goto L6d
                r0 = r8
                java.util.concurrent.BlockingQueue<edu.iu.dsc.tws.api.compute.executor.INodeInstance> r0 = r0.tasks     // Catch: java.lang.Throwable -> L41
                java.lang.Object r0 = r0.poll()     // Catch: java.lang.Throwable -> L41
                edu.iu.dsc.tws.api.compute.executor.INodeInstance r0 = (edu.iu.dsc.tws.api.compute.executor.INodeInstance) r0     // Catch: java.lang.Throwable -> L41
                r9 = r0
                r0 = r9
                if (r0 == 0) goto L30
                r0 = r9
                boolean r0 = r0.execute()     // Catch: java.lang.Throwable -> L41
                r0 = r8
                java.util.concurrent.BlockingQueue<edu.iu.dsc.tws.api.compute.executor.INodeInstance> r0 = r0.tasks     // Catch: java.lang.Throwable -> L41
                r1 = r9
                boolean r0 = r0.offer(r1)     // Catch: java.lang.Throwable -> L41
                goto L3e
            L30:
                java.util.logging.Logger r0 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.access$000()     // Catch: java.lang.Throwable -> L41
                java.util.logging.Level r1 = java.util.logging.Level.INFO     // Catch: java.lang.Throwable -> L41
                java.lang.String r2 = "Thread existing as more threads than tasks are been assigned"
                r0.log(r1, r2)     // Catch: java.lang.Throwable -> L41
                goto L6d
            L3e:
                goto L0
            L41:
                r9 = move-exception
                java.util.logging.Logger r0 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.access$000()
                java.util.logging.Level r1 = java.util.logging.Level.SEVERE
                java.lang.String r2 = "%d Error in executor"
                r3 = 1
                java.lang.Object[] r3 = new java.lang.Object[r3]
                r4 = r3
                r5 = 0
                r6 = r8
                edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor r6 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.this
                int r6 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.access$100(r6)
                java.lang.Integer r6 = java.lang.Integer.valueOf(r6)
                r4[r5] = r6
                java.lang.String r2 = java.lang.String.format(r2, r3)
                r3 = r9
                r0.log(r1, r2, r3)
                java.lang.RuntimeException r0 = new java.lang.RuntimeException
                r1 = r0
                java.lang.String r2 = "Error occurred in execution of task"
                r3 = r9
                r1.<init>(r2, r3)
                throw r0
            L6d:
                r0 = r8
                edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor r0 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.this
                java.util.concurrent.CountDownLatch r0 = edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.access$200(r0)
                r0.countDown()
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: edu.iu.dsc.tws.executor.threading.StreamingSharingExecutor.StreamWorker.run():void");
        }
    }

    public StreamingSharingExecutor(Config config, int i, TWSChannel tWSChannel, ExecutionPlan executionPlan, IExecutionHook iExecutionHook) {
        super(config, tWSChannel, executionPlan, iExecutionHook);
        this.notStopped = true;
        this.cleanUpCalled = false;
        this.workerId = i;
    }

    @Override // edu.iu.dsc.tws.executor.threading.ThreadSharingExecutor
    public boolean runExecution() {
        Map<Integer, INodeInstance> nodes = this.executionPlan.getNodes();
        try {
            if (nodes.size() == 0) {
                LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
                return false;
            }
            try {
                schedulerExecution(nodes);
                progressStreamComm();
                this.notStopped = false;
                cleanUp(nodes);
                return true;
            } catch (Exception e) {
                LOG.log(Level.SEVERE, "Error in scheduling execution", (Throwable) e);
                this.notStopped = false;
                cleanUp(nodes);
                return true;
            }
        } catch (Throwable th) {
            this.notStopped = false;
            cleanUp(nodes);
            throw th;
        }
    }

    public boolean execute(boolean z) {
        return execute();
    }

    private void progressStreamComm() {
        while (isNotStopped()) {
            this.channel.progress();
        }
    }

    private void schedulerExecution(Map<Integer, INodeInstance> map) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(map.size() * 2);
        arrayBlockingQueue.addAll(map.values());
        Iterator it = arrayBlockingQueue.iterator();
        while (it.hasNext()) {
            ((INodeInstance) it.next()).prepare(this.config);
        }
        this.doneSignal = new CountDownLatch(this.numThreads);
        for (int i = 0; i < this.numThreads; i++) {
            this.threads.execute(new StreamWorker(arrayBlockingQueue));
        }
    }

    @Override // edu.iu.dsc.tws.executor.threading.ThreadSharingExecutor
    public IExecution runIExecution() {
        Map<Integer, INodeInstance> nodes = this.executionPlan.getNodes();
        if (nodes.size() == 0) {
            LOG.warning(String.format("Worker %d has zero assigned tasks, you may have more workers than tasks", Integer.valueOf(this.workerId)));
            return new NullExecutor();
        }
        schedulerExecution(nodes);
        return new StreamExecution(this.executionPlan, nodes);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanUp(Map<Integer, INodeInstance> map) {
        try {
            this.doneSignal.await();
            Iterator<INodeInstance> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator it2 = this.executionPlan.getParallelOperations().iterator();
            while (it2.hasNext()) {
                ((IParallelOperation) it2.next()).close();
            }
            this.executionHook.afterExecution();
            this.cleanUpCalled = true;
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    public boolean closeExecution() {
        this.executionHook.onClose(this);
        return false;
    }

    public boolean isNotStopped() {
        return this.notStopped;
    }
}
