package streams.runtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Process;
import stream.Processor;
import stream.ProcessorList;
import stream.io.Sink;
import stream.io.Source;
import stream.runtime.Monitor;
import stream.runtime.ProcessListener;
import streams.application.ComputeGraph;

/* loaded from: input_file:streams/runtime/Supervisor.class */
public class Supervisor implements ProcessListener, Hook {
    static Logger log = LoggerFactory.getLogger(Supervisor.class);
    ComputeGraph dependencies;
    List<Process> runningProcesses = new ArrayList();
    final AtomicInteger running = new AtomicInteger(0);
    final AtomicInteger errors = new AtomicInteger(0);
    final AtomicInteger finished = new AtomicInteger(0);
    Map<Process, Set<Sink>> processOutlets = new HashMap();
    final Object lock = new Object();

    public Supervisor(ComputeGraph computeGraph) {
        this.dependencies = computeGraph;
        log.debug("Creating supervisor for graph {}", computeGraph);
        computeGraph.printShutdownStrategy();
        for (Object obj : computeGraph.getSources()) {
            if (obj instanceof Source) {
                Set targets = computeGraph.getTargets(obj);
                log.debug("  Source '{}'  is read from {} targets: {}", new Object[]{obj, Integer.valueOf(targets.size()), targets});
            }
        }
        for (Process process : computeGraph.processes().values()) {
            Set<Sink> collectSinks = collectSinks(process);
            log.debug("Process '{}' has {} outlets: {}", new Object[]{process, Integer.valueOf(collectSinks.size()), collectSinks});
        }
    }

    @Override // stream.runtime.ProcessListener
    public synchronized void processStarted(Process process) {
        if (process instanceof Monitor) {
            log.debug("Monitor #{} started", process);
            return;
        }
        log.debug("Process  #{}  started.", process);
        int incrementAndGet = this.running.incrementAndGet();
        this.runningProcesses.add(process);
        Set<Sink> collectSinks = collectSinks(process);
        log.debug("   process #{} is writing to {} sinks", process, Integer.valueOf(collectSinks.size()));
        this.processOutlets.put(process, collectSinks);
        log.debug("{} processes running.", Integer.valueOf(incrementAndGet));
    }

    @Override // stream.runtime.ProcessListener
    public synchronized void processError(Process process, Exception exc) {
        log.debug("Process {} finished with error: {}", process, exc.getMessage());
        this.errors.incrementAndGet();
        synchronized (this.lock) {
            this.lock.notify();
        }
        processFinished(process);
    }

    @Override // stream.runtime.ProcessListener
    public synchronized void processFinished(Process process) {
        log.debug("Process {} finished normally...", process);
        int decrementAndGet = this.running.decrementAndGet();
        this.finished.incrementAndGet();
        this.runningProcesses.remove(process);
        log.debug("Process  #{}  finished.", process);
        Set<Sink> set = this.processOutlets.get(process);
        if (set == null) {
            set = new HashSet();
        }
        log.debug("   process has {} outgoing targets: {}", Integer.valueOf(set.size()), set);
        Set<Sink> remove = this.processOutlets.remove(process);
        if (remove != null) {
            for (Sink sink : remove) {
                int i = 0;
                for (Process process2 : this.processOutlets.keySet()) {
                    if (process2 != null && this.processOutlets.get(process2).contains(sink)) {
                        i++;
                    }
                }
                if (i == 0) {
                    log.debug("Reference count of {} is 0, closing sink!", sink);
                    try {
                        sink.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    log.debug("Reference count for {} is: {}", sink, Integer.valueOf(i));
                }
            }
        }
        if (log.isTraceEnabled()) {
            printTargets(process, 0);
        }
        this.dependencies.remove(process);
        Set rootSources = this.dependencies.getRootSources();
        log.debug("{} root sources remaining:   {}", Integer.valueOf(rootSources.size()), rootSources);
        synchronized (this.lock) {
            this.lock.notify();
        }
        log.debug("{} processes running.", Integer.valueOf(decrementAndGet));
    }

    public void printTargets(Object obj, int i) {
        String str = "";
        for (int i2 = 0; i2 < i; i2++) {
            str = str + "  ";
        }
        for (Object obj2 : this.dependencies.getTargets(obj)) {
            log.debug(str + " " + obj2);
            if (obj2 instanceof Sink) {
                return;
            } else {
                printTargets(obj2, i + 1);
            }
        }
    }

    public int processesDone() {
        return this.finished.get() + this.errors.get();
    }

    public int processesRunning() {
        log.debug("Active processes: {}", this.runningProcesses);
        return this.running.get();
    }

    public Set<Sink> collectSinks(Object obj) {
        HashSet hashSet = new HashSet();
        for (Object obj2 : this.dependencies.getTargets(obj)) {
            if (obj2 instanceof Sink) {
                log.debug("Found sink '{}' referenced by {}", obj2, obj);
                hashSet.add((Sink) obj2);
            } else {
                hashSet.addAll(collectSinks(obj2));
            }
        }
        if (obj instanceof Process) {
            log.debug("Checking sinks referenced by elements of process '{}'", obj);
            for (ProcessorList processorList : ((Process) obj).getProcessors()) {
                Set<Sink> collectSinks = collectSinks(processorList);
                if (processorList instanceof ProcessorList) {
                    for (Processor processor : processorList.getProcessors()) {
                        Set<Sink> collectSinks2 = collectSinks(processor);
                        log.debug("  Found {} sinks referenced by child '{}': {}", new Object[]{Integer.valueOf(collectSinks2.size()), processor, collectSinks2});
                        collectSinks.addAll(collectSinks2);
                    }
                } else {
                    Set<Sink> collectSinks3 = collectSinks(processorList);
                    log.debug("  Found {} sinks referenced by child '{}': {}", new Object[]{Integer.valueOf(collectSinks3.size()), processorList, collectSinks3});
                    collectSinks.addAll(collectSinks3);
                }
                hashSet.addAll(collectSinks);
            }
        }
        return hashSet;
    }

    public void waitForProcesses() {
        if (this.running.get() == 0) {
            return;
        }
        synchronized (this.lock) {
            try {
                this.lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public synchronized void signal(int i) {
        log.debug("Shutdown signal received: '{}'", Integer.valueOf(i));
        log.debug("Closing root sources: {}", this.dependencies.getRootSources());
        synchronized (this.lock) {
            this.lock.notify();
        }
        final Set rootSources = this.dependencies.getRootSources();
        if (rootSources.isEmpty()) {
            return;
        }
        new Thread() { // from class: streams.runtime.Supervisor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Iterator it = rootSources.iterator();
                while (it.hasNext()) {
                    Source source = (Source) it.next();
                    Set targets = Supervisor.this.dependencies.getTargets(source);
                    Supervisor.log.info("The following consumers are attached to the root {}:  {}", source, targets);
                    try {
                        source.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    for (Object obj : targets) {
                        try {
                            Supervisor.log.info("Notifying consumer {}", obj);
                            obj.notify();
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                    it.remove();
                }
            }
        }.start();
    }
}
