package io.aleph0.yap.messaging.test;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.messaging.core.RelayMetrics;
import io.aleph0.yap.messaging.core.RelayProcessorWorker;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/test/TestRelayProcessorWorker.class */
public class TestRelayProcessorWorker<ValueT> implements RelayProcessorWorker<ValueT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestRelayProcessorWorker.class);
    private final AtomicLong submittedMetrics;
    private final AtomicLong acknowledgedMetrics;
    private final AtomicLong awaitingMetrics;
    private final ScheduledExecutorService executor;
    private final Scheduler scheduler;

    public TestRelayProcessorWorker() {
        this(Scheduler.defaultScheduler());
    }

    public TestRelayProcessorWorker(Scheduler scheduler) {
        this.submittedMetrics = new AtomicLong(0L);
        this.acknowledgedMetrics = new AtomicLong(0L);
        this.awaitingMetrics = new AtomicLong(0L);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler, "scheduler");
    }

    /* JADX WARN: Finally extract failed */
    public void process(Source<ValueT> source, Sink<ValueT> sink) throws IOException, InterruptedException {
        try {
            AtomicReference<Throwable> atomicReference = new AtomicReference<>(null);
            try {
                Object take = source.take();
                while (take != null) {
                    throwIfPresent(atomicReference);
                    Duration schedule = this.scheduler.schedule();
                    if (schedule.isNegative()) {
                        throw new IllegalArgumentException("scheduler returned negative delay");
                    }
                    Object obj = take;
                    this.executor.schedule(() -> {
                        try {
                            sink.put(obj);
                            this.acknowledgedMetrics.incrementAndGet();
                            this.awaitingMetrics.decrementAndGet();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LOGGER.atError().setCause(e).log("Interrupted while trying to put delayed message. Failing task...");
                            atomicReference.compareAndSet(null, e);
                        } catch (Throwable th) {
                            LOGGER.atError().setCause(th).log("Failed to put delayed message. Failing task...");
                            atomicReference.compareAndSet(null, th);
                        }
                    }, schedule.toNanos(), TimeUnit.NANOSECONDS);
                    this.submittedMetrics.incrementAndGet();
                    this.awaitingMetrics.incrementAndGet();
                    take = source.take();
                }
                this.executor.shutdown();
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
                throwIfPresent(atomicReference);
            } catch (Throwable th) {
                this.executor.shutdown();
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.atError().setCause(e).log("Simulated relay interrupted. Failing task...");
            throw e;
        } catch (RuntimeException e2) {
            LOGGER.atError().setCause(e2).log("Simulated relay failed. Failing task...");
            throw e2;
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            LOGGER.atError().setCause(cause).log("Simulated relay failed. Failing task...");
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (!(cause instanceof Exception)) {
                throw new AssertionError("Unexpected error", e3);
            }
            throw new IOException("Simulated relay failed", (Exception) cause);
        }
    }

    private void throwIfPresent(AtomicReference<Throwable> atomicReference) throws InterruptedException, ExecutionException {
        Throwable th = atomicReference.get();
        if (th != null) {
            if (th instanceof Error) {
                throw ((Error) th);
            }
            if (th instanceof InterruptedException) {
                throw ((InterruptedException) th);
            }
            if (!(th instanceof Exception)) {
                throw new AssertionError("Unexpected error", th);
            }
            throw new ExecutionException((Exception) th);
        }
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public RelayMetrics m5checkMetrics() {
        return new RelayMetrics(this.submittedMetrics.get(), this.acknowledgedMetrics.get(), this.awaitingMetrics.get());
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public RelayMetrics m4flushMetrics() {
        RelayMetrics m5checkMetrics = m5checkMetrics();
        this.submittedMetrics.set(0L);
        this.acknowledgedMetrics.set(0L);
        return m5checkMetrics;
    }
}
