package io.aleph0.yap.messaging.gcp;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.messaging.core.RelayMetrics;
import io.aleph0.yap.messaging.core.RelayProcessorWorker;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubRelayProcessorWorker.class */
public class PubsubRelayProcessorWorker<ValueT> implements RelayProcessorWorker<ValueT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PubsubRelayProcessorWorker.class);
    private final AtomicLong submittedMetric = new AtomicLong(0);
    private final AtomicLong acknowledgedMetric = new AtomicLong(0);
    private final Phaser phaser = new Phaser(1);
    private final PublisherFactory publisherFactory;
    private final MessageExtractor<ValueT> messageExtractor;

    @FunctionalInterface
    /* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubRelayProcessorWorker$MessageExtractor.class */
    public interface MessageExtractor<T> {
        List<PubsubMessage> extractMessages(T t);
    }

    @FunctionalInterface
    /* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubRelayProcessorWorker$PublisherFactory.class */
    public interface PublisherFactory {
        Publisher newPublisher();
    }

    public PubsubRelayProcessorWorker(PublisherFactory publisherFactory, MessageExtractor<ValueT> messageExtractor) {
        this.publisherFactory = (PublisherFactory) Objects.requireNonNull(publisherFactory, "publisherFactory");
        this.messageExtractor = (MessageExtractor) Objects.requireNonNull(messageExtractor, "messageExtractor");
    }

    public void process(Source<ValueT> source, final Sink<ValueT> sink) throws IOException, InterruptedException {
        try {
            Publisher newPublisher = this.publisherFactory.newPublisher();
            try {
                final AtomicReference<Throwable> atomicReference = new AtomicReference<>(null);
                Object take = source.take();
                while (take != null) {
                    for (PubsubMessage pubsubMessage : this.messageExtractor.extractMessages(take)) {
                        throwIfPresent(atomicReference);
                        ApiFuture publish = newPublisher.publish(pubsubMessage);
                        this.submittedMetric.incrementAndGet();
                        this.phaser.register();
                        final Object obj = take;
                        ApiFutures.addCallback(publish, new ApiFutureCallback<String>(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubRelayProcessorWorker.1
                            final /* synthetic */ PubsubRelayProcessorWorker this$0;

                            {
                                this.this$0 = this;
                            }

                            public void onSuccess(String str) {
                                try {
                                    this.this$0.acknowledgedMetric.incrementAndGet();
                                    this.this$0.phaser.arriveAndDeregister();
                                    sink.put(obj);
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    PubsubRelayProcessorWorker.LOGGER.atWarn().addKeyValue("message", e.getMessage()).setCause(e).log("Interrupted while trying to put published message. Failing task...");
                                    atomicReference.compareAndSet(null, e);
                                } catch (Throwable th) {
                                    PubsubRelayProcessorWorker.LOGGER.atError().addKeyValue("message", th.getMessage()).setCause(th).log("Failed to put published message. Failing task...");
                                    atomicReference.compareAndSet(null, th);
                                }
                            }

                            public void onFailure(Throwable th) {
                                PubsubRelayProcessorWorker.LOGGER.atError().addKeyValue("message", th.getMessage()).setCause(th).log("Message failed to publish. Failing task...");
                                atomicReference.compareAndSet(null, th);
                                this.this$0.acknowledgedMetric.incrementAndGet();
                                this.this$0.phaser.arriveAndDeregister();
                            }
                        }, MoreExecutors.directExecutor());
                    }
                    take = source.take();
                }
                throwIfPresent(atomicReference);
                this.phaser.arriveAndAwaitAdvance();
                newPublisher.shutdown();
            } catch (Throwable th) {
                this.phaser.arriveAndAwaitAdvance();
                newPublisher.shutdown();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.atWarn().setCause(e).log("Pubsub relay interrupted. Propagating...");
            throw e;
        } catch (RuntimeException e2) {
            LOGGER.atError().setCause(e2).log("Pubsub relay failed. Failing task...");
            throw e2;
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            LOGGER.atError().setCause(cause).log("Pubsub relay failed. Failing task...");
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (!(cause instanceof Exception)) {
                throw new AssertionError("Unexpected error", e3);
            }
            throw new IOException("Pubsub relay failed", (Exception) cause);
        }
    }

    private void throwIfPresent(AtomicReference<Throwable> atomicReference) throws InterruptedException, ExecutionException {
        Throwable th = atomicReference.get();
        if (th != null) {
            if (th instanceof Error) {
                throw ((Error) th);
            }
            if (th instanceof InterruptedException) {
                throw ((InterruptedException) th);
            }
            if (!(th instanceof Exception)) {
                throw new AssertionError("Unexpected error", th);
            }
            throw new ExecutionException((Exception) th);
        }
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public RelayMetrics m7checkMetrics() {
        return new RelayMetrics(this.submittedMetric.get(), this.acknowledgedMetric.get(), this.phaser.getUnarrivedParties() - 1);
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public RelayMetrics m6flushMetrics() {
        RelayMetrics m7checkMetrics = m7checkMetrics();
        this.submittedMetric.set(0L);
        this.acknowledgedMetric.set(0L);
        return m7checkMetrics;
    }
}
