package io.nosqlbench.virtdata.userlibs.apps.summarizer;

import io.nosqlbench.virtdata.userlibs.apps.valuechecker.IndexedThreadFactory;
import java.lang.Thread;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/virtdata/userlibs/apps/summarizer/StageManager.class */
public class StageManager implements Thread.UncaughtExceptionHandler, Runnable {
    private static final Logger logger = LogManager.getLogger(StageManager.class);
    private final ExecutorService pool;
    private final IntFunction<Runnable> tasks;
    private final int threads;
    private final ConcurrentLinkedDeque<Throwable> errors = new ConcurrentLinkedDeque<>();
    Lock lock = new ReentrantLock();
    Condition goTime = this.lock.newCondition();
    private final ConcurrentLinkedQueue<Object> readyQueue = new ConcurrentLinkedQueue<>();
    private final IndexedThreadFactory tf = new IndexedThreadFactory("values-checker", this);

    /* loaded from: input_file:io/nosqlbench/virtdata/userlibs/apps/summarizer/StageManager$RunBox.class */
    private static final class RunBox implements Runnable {
        private final Runnable inner;
        private final StageManager stage;

        public RunBox(Runnable runnable, StageManager stageManager) {
            this.inner = runnable;
            this.stage = stageManager;
        }

        @Override // java.lang.Runnable
        public void run() {
            StageManager.logger.debug("blocking for start");
            this.stage.OnYourMarkGetSet(this);
            StageManager.logger.debug("running");
            this.inner.run();
            StageManager.logger.debug("blocking for completion");
            this.stage.OnYourMarkGetSet(this);
            StageManager.logger.debug("returning");
        }
    }

    public StageManager(int i, IntFunction<Runnable> intFunction) {
        this.threads = i;
        this.pool = Executors.newFixedThreadPool(i, this.tf);
        this.tasks = intFunction;
    }

    @Override // java.lang.Runnable
    public void run() {
        for (int i = 0; i < this.threads; i++) {
            this.pool.submit(new RunBox(this.tasks.apply(i), this));
        }
        coordinateFor(this.threads, "tasks");
        coordinateFor(this.threads, "completion");
    }

    public void OnYourMarkGetSet(Object obj) {
        RuntimeException runtimeException;
        try {
            try {
                this.lock.lock();
                this.readyQueue.add(obj);
                logger.trace(() -> {
                    return "awaiting signal for " + obj;
                });
                this.goTime.await();
                this.lock.unlock();
            } finally {
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void coordinateFor(int i, String str) {
        logger.trace(() -> {
            return "coordinating " + i + " threads for " + str;
        });
        try {
            try {
                long j = 1;
                long currentTimeMillis = System.currentTimeMillis();
                while (this.readyQueue.size() < i) {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > 10000) {
                        RuntimeException runtimeException = new RuntimeException("Waited for " + currentTimeMillis2 + " millis and not synchronized yet for " + runtimeException);
                        throw runtimeException;
                    }
                    logger.debug("threads ready for " + str + ": " + this.readyQueue.size() + ", delaying " + j + "ms");
                    Thread.sleep(j);
                    j = Math.min(1024L, j * 2);
                    throwInjectedExceptions();
                }
                this.readyQueue.clear();
                this.lock.lock();
                this.goTime.signalAll();
                this.lock.unlock();
            } catch (Exception e) {
                logger.error("Error while signaling threads:", e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private synchronized void throwInjectedExceptions() {
        if (this.errors.peekFirst() != null) {
            int i = 0;
            Iterator<Throwable> it = this.errors.iterator();
            while (it.hasNext()) {
                Throwable next = it.next();
                int i2 = i;
                i++;
                System.out.print("EXCEPTION " + i2 + ": ");
                System.out.println(next.getMessage());
            }
            throw new RuntimeException(this.errors.peekFirst());
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
    }
}
