package io.aleph0.yap.core.pipeline;

import io.aleph0.yap.core.Pipeline;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/core/pipeline/MonitoredPipeline.class */
public class MonitoredPipeline implements Pipeline {
    private static final Logger LOGGER = LoggerFactory.getLogger(MonitoredPipeline.class);
    private static final ScheduledExecutorService DEFAULT_SCHEDULER = Executors.newSingleThreadScheduledExecutor();
    public static final Duration DEFAULT_PERIOD = Duration.ofMinutes(1);
    private final ScheduledExecutorService scheduler;
    private final Pipeline delegate;
    private final MetricsReporter reporter;
    private final Duration period;
    private volatile ScheduledFuture<?> reporting;

    @FunctionalInterface
    /* loaded from: input_file:io/aleph0/yap/core/pipeline/MonitoredPipeline$MetricsReporter.class */
    public interface MetricsReporter {
        static MetricsReporter stdout() {
            return metrics -> {
                System.out.println(metrics.toString());
            };
        }

        static MetricsReporter stderr() {
            return metrics -> {
                System.err.println(metrics.toString());
            };
        }

        void reportMetrics(Pipeline.Metrics metrics);
    }

    public static PipelineWrapper newWrapper() {
        return pipeline -> {
            return new MonitoredPipeline(pipeline, MetricsReporter.stderr());
        };
    }

    public static PipelineWrapper newWrapper(MetricsReporter metricsReporter) {
        return pipeline -> {
            return new MonitoredPipeline(pipeline, metricsReporter);
        };
    }

    public static PipelineWrapper newWrapper(MetricsReporter metricsReporter, Duration duration) {
        return pipeline -> {
            return new MonitoredPipeline(pipeline, metricsReporter, duration);
        };
    }

    public MonitoredPipeline(Pipeline pipeline, MetricsReporter metricsReporter) {
        this(pipeline, metricsReporter, DEFAULT_PERIOD);
    }

    public MonitoredPipeline(Pipeline pipeline, MetricsReporter metricsReporter, Duration duration) {
        this(DEFAULT_SCHEDULER, pipeline, metricsReporter, duration);
    }

    public MonitoredPipeline(ScheduledExecutorService scheduledExecutorService, Pipeline pipeline, MetricsReporter metricsReporter, Duration duration) {
        this.scheduler = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
        this.delegate = (Pipeline) Objects.requireNonNull(pipeline);
        this.reporter = (MetricsReporter) Objects.requireNonNull(metricsReporter);
        this.period = (Duration) Objects.requireNonNull(duration);
    }

    @Override // io.aleph0.yap.core.Pipeline
    public int getId() {
        return this.delegate.getId();
    }

    @Override // io.aleph0.yap.core.Pipeline
    public void addLifecycleListener(Pipeline.LifecycleListener lifecycleListener) {
        this.delegate.addLifecycleListener(lifecycleListener);
    }

    @Override // io.aleph0.yap.core.Pipeline
    public void removeLifecycleListener(Pipeline.LifecycleListener lifecycleListener) {
        this.delegate.removeLifecycleListener(lifecycleListener);
    }

    @Override // io.aleph0.yap.core.Pipeline
    public void start() {
        this.delegate.addLifecycleListener(new Pipeline.LifecycleListener() { // from class: io.aleph0.yap.core.pipeline.MonitoredPipeline.1
            @Override // io.aleph0.yap.core.Pipeline.LifecycleListener
            public void onPipelineStarted(int i) {
                startReporting();
            }

            private void startReporting() {
                MonitoredPipeline monitoredPipeline = MonitoredPipeline.this;
                ScheduledExecutorService scheduledExecutorService = MonitoredPipeline.this.scheduler;
                MonitoredPipeline monitoredPipeline2 = MonitoredPipeline.this;
                monitoredPipeline.reporting = scheduledExecutorService.scheduleAtFixedRate(() -> {
                    MonitoredPipeline.access$2(r2);
                }, 0L, MonitoredPipeline.this.period.toMillis(), TimeUnit.MILLISECONDS);
            }

            @Override // io.aleph0.yap.core.Pipeline.LifecycleListener
            public void onPipelineCompleted(int i) {
                stopReporting();
            }

            @Override // io.aleph0.yap.core.Pipeline.LifecycleListener
            public void onPipelineCancelled(int i) {
                stopReporting();
            }

            @Override // io.aleph0.yap.core.Pipeline.LifecycleListener
            public void onPipelineFailed(int i, Throwable th) {
                stopReporting();
            }

            private void stopReporting() {
                if (MonitoredPipeline.this.reporting != null) {
                    MonitoredPipeline.this.reporting.cancel(false);
                    MonitoredPipeline.this.reporting = null;
                }
            }
        });
        this.delegate.start();
    }

    @Override // io.aleph0.yap.core.Pipeline
    public void cancel() {
        this.delegate.cancel();
    }

    @Override // io.aleph0.yap.core.Pipeline
    public void await() throws InterruptedException, ExecutionException, CancellationException {
        this.delegate.await();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.aleph0.yap.core.Measureable
    public Pipeline.Metrics checkMetrics() {
        return this.delegate.checkMetrics();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.aleph0.yap.core.Measureable
    public Pipeline.Metrics flushMetrics() {
        return this.delegate.flushMetrics();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reportMetrics() {
        try {
            LOGGER.atDebug().addKeyValue("pipeline", Integer.valueOf(getId())).addKeyValue("reporter", this.reporter).addKeyValue("period", this.period).log("Collecting and reporting metrics");
            this.reporter.reportMetrics(this.delegate.flushMetrics());
        } catch (Exception e) {
            LOGGER.atError().addKeyValue("pipeline", Integer.valueOf(getId())).addKeyValue("reporter", this.reporter).addKeyValue("period", this.period).setCause(e).log("Error collecting or reporting metrics");
        }
    }

    static /* synthetic */ void access$2(MonitoredPipeline monitoredPipeline) {
        monitoredPipeline.reportMetrics();
    }
}
