package io.aleph0.yap.messaging.core;

import io.aleph0.yap.core.Source;
import io.aleph0.yap.core.worker.MeasuredConsumerWorker;
import io.aleph0.yap.messaging.core.Acknowledgeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/core/AcknowledgerConsumerWorker.class */
public class AcknowledgerConsumerWorker<T extends Acknowledgeable> implements MeasuredConsumerWorker<T, AcknowledgerMetrics> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AcknowledgerConsumerWorker.class);
    private final AtomicLong acknowledgedMetric;
    private final AtomicLong retiredSuccessMetric;
    private final AtomicLong retiredFailureMetric;
    private final AtomicLong awaitingMetric;
    private final Phaser phaser;
    private final AcknowledgementFailureHandler acknowledgementFailureHandler;

    @FunctionalInterface
    /* loaded from: input_file:io/aleph0/yap/messaging/core/AcknowledgerConsumerWorker$AcknowledgementFailureHandler.class */
    public interface AcknowledgementFailureHandler {
        void onAckFailure(Throwable th) throws Throwable;
    }

    public static AcknowledgementFailureHandler defaultAcknowledgementFailureHandler() {
        return th -> {
            throw th;
        };
    }

    public AcknowledgerConsumerWorker() {
        this(defaultAcknowledgementFailureHandler());
    }

    public AcknowledgerConsumerWorker(AcknowledgementFailureHandler acknowledgementFailureHandler) {
        this.acknowledgedMetric = new AtomicLong(0L);
        this.retiredSuccessMetric = new AtomicLong(0L);
        this.retiredFailureMetric = new AtomicLong(0L);
        this.awaitingMetric = new AtomicLong(0L);
        this.phaser = new Phaser(1);
        this.acknowledgementFailureHandler = (AcknowledgementFailureHandler) Objects.requireNonNull(acknowledgementFailureHandler, "acknowledgementFailureHandler");
    }

    public void consume(Source<T> source) throws IOException, InterruptedException {
        try {
            try {
                final AtomicReference<Throwable> atomicReference = new AtomicReference<>(null);
                Acknowledgeable acknowledgeable = (Acknowledgeable) source.take();
                while (acknowledgeable != null) {
                    throwIfPresent(atomicReference);
                    this.acknowledgedMetric.incrementAndGet();
                    this.awaitingMetric.incrementAndGet();
                    this.phaser.register();
                    acknowledgeable.ack(new Acknowledgeable.AcknowledgementListener(this) { // from class: io.aleph0.yap.messaging.core.AcknowledgerConsumerWorker.1
                        final /* synthetic */ AcknowledgerConsumerWorker this$0;

                        {
                            this.this$0 = this;
                        }

                        @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                        public void onSuccess() {
                            this.this$0.retiredSuccessMetric.incrementAndGet();
                            this.this$0.awaitingMetric.decrementAndGet();
                            this.this$0.phaser.arriveAndDeregister();
                        }

                        @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                        public void onFailure(Throwable th) {
                            this.this$0.retiredFailureMetric.incrementAndGet();
                            this.this$0.awaitingMetric.decrementAndGet();
                            this.this$0.phaser.arriveAndDeregister();
                            if (th instanceof Error) {
                                AcknowledgerConsumerWorker.LOGGER.atError().setCause(th).log("Error while trying to acknowledge message. Failing task...");
                                atomicReference.compareAndSet(null, th);
                                return;
                            }
                            if (th instanceof InterruptedException) {
                                Thread.currentThread().interrupt();
                                AcknowledgerConsumerWorker.LOGGER.atWarn().setCause(th).log("Interrupted while trying to acknowledge message. Stopping...");
                                atomicReference.compareAndSet(null, new InterruptedException());
                            } else {
                                try {
                                    this.this$0.acknowledgementFailureHandler.onAckFailure(th);
                                    AcknowledgerConsumerWorker.LOGGER.atWarn().setCause(th).log("Failed to acknowledge message, but user ignored. Continuing...");
                                } catch (Throwable th2) {
                                    AcknowledgerConsumerWorker.LOGGER.atError().setCause(th2).log("Failed to acknowledge message and user propagated. Failing task...");
                                    atomicReference.compareAndSet(null, th2);
                                }
                            }
                        }
                    });
                    acknowledgeable = (Acknowledgeable) source.take();
                }
                throwIfPresent(atomicReference);
                this.phaser.arriveAndAwaitAdvance();
            } catch (Throwable th) {
                this.phaser.arriveAndAwaitAdvance();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.atWarn().setCause(e).log("Interrupted while waiting for acknowledgements. Propagating...");
            throw e;
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            LOGGER.atError().setCause(cause).log("Pubsub firehose failed. Failing task...");
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (!(cause instanceof Exception)) {
                throw new AssertionError("Unexpected error", e2);
            }
            throw new IOException("Pubsub firehose failed", (Exception) cause);
        }
    }

    private void throwIfPresent(AtomicReference<Throwable> atomicReference) throws InterruptedException, ExecutionException {
        Throwable th = atomicReference.get();
        if (th != null) {
            if (th instanceof Error) {
                throw ((Error) th);
            }
            if (th instanceof InterruptedException) {
                throw new InterruptedException();
            }
            if (!(th instanceof Exception)) {
                throw new AssertionError("Unexpected error", th);
            }
            throw new ExecutionException((Exception) th);
        }
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public AcknowledgerMetrics m2checkMetrics() {
        long j = this.acknowledgedMetric.get();
        long j2 = this.retiredSuccessMetric.get();
        long j3 = this.retiredFailureMetric.get();
        return new AcknowledgerMetrics(j, j2 + j3, j2, j3, this.awaitingMetric.get());
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public AcknowledgerMetrics m1flushMetrics() {
        AcknowledgerMetrics m2checkMetrics = m2checkMetrics();
        this.acknowledgedMetric.set(0L);
        this.retiredSuccessMetric.set(0L);
        this.retiredFailureMetric.set(0L);
        return m2checkMetrics;
    }
}
