package io.aleph0.yap.messaging.gcp;

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import io.aleph0.yap.core.Sink;
import io.aleph0.yap.messaging.core.Acknowledgeable;
import io.aleph0.yap.messaging.core.FirehoseMetrics;
import io.aleph0.yap.messaging.core.FirehoseProducerWorker;
import io.aleph0.yap.messaging.core.Message;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubFirehoseProducerWorker.class */
public class PubsubFirehoseProducerWorker implements FirehoseProducerWorker<Message<String>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PubsubFirehoseProducerWorker.class);
    private final Set<PubsubFirehoseMessage> outstanding = Collections.newSetFromMap(new ConcurrentHashMap());
    private final AtomicLong receivedMetric = new AtomicLong(0);
    private final SubscriberFactory subscriberFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubFirehoseProducerWorker$PubsubFirehoseMessage.class */
    public static class PubsubFirehoseMessage implements Message<String> {
        private static final int NONE = 0;
        private static final int ACKING = 10;
        private static final int ACKED = 11;
        private static final int NACKING = 20;
        private static final int NACKED = 21;
        private final PubsubMessage message;
        private final BiConsumer<PubsubFirehoseMessage, Acknowledgeable.AcknowledgementListener> acker;
        private final BiConsumer<PubsubFirehoseMessage, Acknowledgeable.AcknowledgementListener> nacker;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final AtomicInteger state = new AtomicInteger(NONE);
        private volatile Throwable failureCause = null;

        public PubsubFirehoseMessage(PubsubMessage pubsubMessage, BiConsumer<PubsubFirehoseMessage, Acknowledgeable.AcknowledgementListener> biConsumer, BiConsumer<PubsubFirehoseMessage, Acknowledgeable.AcknowledgementListener> biConsumer2) {
            this.message = (PubsubMessage) Objects.requireNonNull(pubsubMessage, "message");
            this.acker = (BiConsumer) Objects.requireNonNull(biConsumer, "acker");
            this.nacker = (BiConsumer) Objects.requireNonNull(biConsumer2, "nacker");
        }

        public String id() {
            return this.message.getMessageId();
        }

        public Map<String, String> attributes() {
            return this.message.getAttributesMap();
        }

        /* renamed from: body, reason: merged with bridge method [inline-methods] */
        public String m4body() {
            return this.message.getData().toStringUtf8();
        }

        public void ack(final Acknowledgeable.AcknowledgementListener acknowledgementListener) {
            int compareAndExchange = this.state.compareAndExchange(NONE, ACKING);
            switch (compareAndExchange) {
                case NONE /* 0 */:
                    this.acker.accept(this, new Acknowledgeable.AcknowledgementListener(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.PubsubFirehoseMessage.1
                        final /* synthetic */ PubsubFirehoseMessage this$0;

                        {
                            this.this$0 = this;
                        }

                        public void onSuccess() {
                            try {
                                acknowledgementListener.onSuccess();
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.ACKED);
                                    this.this$0.notifyAll();
                                }
                            } catch (Throwable th) {
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.ACKED);
                                    this.this$0.notifyAll();
                                    throw th;
                                }
                            }
                        }

                        public void onFailure(Throwable th) {
                            try {
                                acknowledgementListener.onFailure(th);
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.ACKED);
                                    this.this$0.failureCause = th;
                                    this.this$0.notifyAll();
                                }
                            } catch (Throwable th2) {
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.ACKED);
                                    this.this$0.failureCause = th;
                                    this.this$0.notifyAll();
                                    throw th2;
                                }
                            }
                        }
                    });
                    return;
                case ACKING /* 10 */:
                    synchronized (this) {
                        int i = this.state.get();
                        while (i == ACKING) {
                            try {
                                wait();
                                i = this.state.get();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                acknowledgementListener.onFailure(e);
                                return;
                            }
                        }
                        if (!$assertionsDisabled && i != ACKED) {
                            throw new AssertionError();
                        }
                    }
                    break;
                case ACKED /* 11 */:
                    break;
                case NACKING /* 20 */:
                case NACKED /* 21 */:
                    acknowledgementListener.onFailure(new IllegalStateException("message already nacked"));
                    return;
                default:
                    throw new AssertionError("unexpected state state: " + compareAndExchange);
            }
            if (this.failureCause == null) {
                acknowledgementListener.onSuccess();
            } else {
                acknowledgementListener.onFailure(this.failureCause);
            }
        }

        public void nack(final Acknowledgeable.AcknowledgementListener acknowledgementListener) {
            int compareAndExchange = this.state.compareAndExchange(NONE, NACKING);
            switch (compareAndExchange) {
                case NONE /* 0 */:
                    this.nacker.accept(this, new Acknowledgeable.AcknowledgementListener(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.PubsubFirehoseMessage.2
                        final /* synthetic */ PubsubFirehoseMessage this$0;

                        {
                            this.this$0 = this;
                        }

                        public void onSuccess() {
                            try {
                                acknowledgementListener.onSuccess();
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.NACKED);
                                    this.this$0.notifyAll();
                                }
                            } catch (Throwable th) {
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.NACKED);
                                    this.this$0.notifyAll();
                                    throw th;
                                }
                            }
                        }

                        public void onFailure(Throwable th) {
                            try {
                                acknowledgementListener.onFailure(th);
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.NACKED);
                                    this.this$0.failureCause = th;
                                    this.this$0.notifyAll();
                                }
                            } catch (Throwable th2) {
                                synchronized (this.this$0) {
                                    this.this$0.state.set(PubsubFirehoseMessage.NACKED);
                                    this.this$0.failureCause = th;
                                    this.this$0.notifyAll();
                                    throw th2;
                                }
                            }
                        }
                    });
                    return;
                case ACKING /* 10 */:
                case ACKED /* 11 */:
                    acknowledgementListener.onFailure(new IllegalStateException("message already acked"));
                    return;
                case NACKING /* 20 */:
                    synchronized (this) {
                        int i = this.state.get();
                        while (i == NACKING) {
                            try {
                                wait();
                                i = this.state.get();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                acknowledgementListener.onFailure(e);
                                return;
                            }
                        }
                        if (!$assertionsDisabled && i != NACKED) {
                            throw new AssertionError();
                        }
                    }
                    break;
                case NACKED /* 21 */:
                    break;
                default:
                    throw new AssertionError("unexpected state state: " + compareAndExchange);
            }
            if (this.failureCause == null) {
                acknowledgementListener.onSuccess();
            } else {
                acknowledgementListener.onFailure(this.failureCause);
            }
        }

        public int hashCode() {
            return System.identityHashCode(this);
        }

        public boolean equals(Object obj) {
            return this == obj;
        }

        static {
            $assertionsDisabled = !PubsubFirehoseProducerWorker.class.desiredAssertionStatus();
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/aleph0/yap/messaging/gcp/PubsubFirehoseProducerWorker$SubscriberFactory.class */
    public interface SubscriberFactory {
        Subscriber newSubscriber(MessageReceiverWithAckResponse messageReceiverWithAckResponse);
    }

    public PubsubFirehoseProducerWorker(SubscriberFactory subscriberFactory) {
        this.subscriberFactory = (SubscriberFactory) Objects.requireNonNull(subscriberFactory, "subscriberFactory");
    }

    public void produce(final Sink<Message<String>> sink) throws IOException, InterruptedException {
        try {
            final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            Subscriber newSubscriber = this.subscriberFactory.newSubscriber(new MessageReceiverWithAckResponse(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.1
                final /* synthetic */ PubsubFirehoseProducerWorker this$0;

                {
                    this.this$0 = this;
                }

                public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumerWithResponse ackReplyConsumerWithResponse) {
                    PubsubFirehoseMessage pubsubFirehoseMessage = new PubsubFirehoseMessage(pubsubMessage, (pubsubFirehoseMessage2, acknowledgementListener) -> {
                        this.this$0.outstanding.remove(pubsubFirehoseMessage2);
                        ApiFutures.addCallback(ackReplyConsumerWithResponse.ack(), new ApiFutureCallback<AckResponse>(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.1.1
                            public void onSuccess(AckResponse ackResponse) {
                                if (ackResponse == AckResponse.SUCCESSFUL) {
                                    acknowledgementListener.onSuccess();
                                } else {
                                    onFailure(new IOException("ack failed with response " + String.valueOf(ackResponse)));
                                }
                            }

                            public void onFailure(Throwable th) {
                                acknowledgementListener.onFailure(th);
                            }
                        }, MoreExecutors.directExecutor());
                    }, (pubsubFirehoseMessage3, acknowledgementListener2) -> {
                        this.this$0.outstanding.remove(pubsubFirehoseMessage3);
                        ApiFutures.addCallback(ackReplyConsumerWithResponse.nack(), new ApiFutureCallback<AckResponse>(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.1.2
                            public void onSuccess(AckResponse ackResponse) {
                                if (ackResponse == AckResponse.SUCCESSFUL) {
                                    acknowledgementListener2.onSuccess();
                                } else {
                                    onFailure(new IOException("nack failed with response " + String.valueOf(ackResponse)));
                                }
                            }

                            public void onFailure(Throwable th) {
                                acknowledgementListener2.onFailure(th);
                            }
                        }, MoreExecutors.directExecutor());
                    });
                    if (PubsubFirehoseProducerWorker.LOGGER.isTraceEnabled()) {
                        PubsubFirehoseProducerWorker.LOGGER.atTrace().addKeyValue("identity", Integer.valueOf(pubsubFirehoseMessage.hashCode())).log("Received message from PubSub");
                    }
                    this.this$0.outstanding.add(pubsubFirehoseMessage);
                    try {
                        sink.put(pubsubFirehoseMessage);
                        this.this$0.receivedMetric.incrementAndGet();
                    } catch (InterruptedException e) {
                        this.this$0.outstanding.remove(pubsubFirehoseMessage);
                        Thread.currentThread().interrupt();
                        PubsubFirehoseProducerWorker.LOGGER.atWarn().setCause(e).log("Interrupted while trying to put message. Stopping...");
                        arrayBlockingQueue.offer(e);
                    } catch (Throwable th) {
                        this.this$0.outstanding.remove(pubsubFirehoseMessage);
                        PubsubFirehoseProducerWorker.LOGGER.atError().setCause(th).log("Failed to put message. Stopping...");
                        arrayBlockingQueue.offer(th);
                    }
                }
            });
            newSubscriber.addListener(new ApiService.Listener(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.2
                public void stopping(ApiService.State state) {
                    PubsubFirehoseProducerWorker.LOGGER.atDebug().addKeyValue("from", state).log("Subscriber stopping");
                }

                public void terminated(ApiService.State state) {
                    PubsubFirehoseProducerWorker.LOGGER.atDebug().addKeyValue("from", state).log("Subscriber terminated");
                }

                public void failed(ApiService.State state, Throwable th) {
                    PubsubFirehoseProducerWorker.LOGGER.atError().setCause(th).addKeyValue("from", state).log("Subscriber failed");
                    arrayBlockingQueue.offer(th);
                }
            }, MoreExecutors.directExecutor());
            newSubscriber.startAsync().awaitRunning();
            try {
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                LOGGER.atInfo().log("Subscriber connected");
                Throwable th = (Throwable) arrayBlockingQueue.take();
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (th instanceof InterruptedException) {
                    throw ((InterruptedException) th);
                }
                if (!(th instanceof Exception)) {
                    throw new AssertionError("Unexpected error", th);
                }
                throw new ExecutionException((Exception) th);
            } catch (Throwable th2) {
                try {
                    LOGGER.atDebug().log("Stopping subscriber");
                    newSubscriber.stopAsync();
                    nackOutstandingMessages();
                    newSubscriber.awaitTerminated();
                } catch (IllegalStateException e) {
                    LOGGER.atWarn().setCause(e).log("Failed to stop subscriber because subscriber was already stopped. Ignoring...");
                }
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                LOGGER.atDebug().log("Subscriber stopped");
                throw th2;
            }
        } catch (InterruptedException e2) {
            LOGGER.atError().setCause(e2).log("Pubsub firehose interrupted. Propagating...");
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        } catch (RuntimeException e3) {
            LOGGER.atError().setCause(e3).log("Pubsub firehose failed. Failing task...");
            throw e3;
        } catch (ExecutionException e4) {
            Throwable cause = e4.getCause();
            LOGGER.atError().setCause(cause).log("Pubsub firehose failed. Failing task...");
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (!(cause instanceof Exception)) {
                throw new AssertionError("Unexpected error", e4);
            }
            throw new IOException("Pubsub firehose failed", (Exception) cause);
        }
    }

    private void nackOutstandingMessages() {
        while (!this.outstanding.isEmpty()) {
            LOGGER.atDebug().log("Nacking outstanding messages...");
            Iterator<PubsubFirehoseMessage> it = this.outstanding.iterator();
            while (it.hasNext()) {
                final PubsubFirehoseMessage next = it.next();
                next.nack(new Acknowledgeable.AcknowledgementListener(this) { // from class: io.aleph0.yap.messaging.gcp.PubsubFirehoseProducerWorker.3
                    public void onSuccess() {
                        PubsubFirehoseProducerWorker.LOGGER.atTrace().addKeyValue("identity", Integer.valueOf(next.hashCode())).log("Successfully nacked message at stop time");
                    }

                    public void onFailure(Throwable th) {
                        PubsubFirehoseProducerWorker.LOGGER.atTrace().setCause(th).addKeyValue("identity", Integer.valueOf(next.hashCode())).log("Failed to nack message at stop time");
                    }
                });
                it.remove();
            }
        }
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public FirehoseMetrics m2checkMetrics() {
        return new FirehoseMetrics(this.receivedMetric.get());
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public FirehoseMetrics m1flushMetrics() {
        FirehoseMetrics m2checkMetrics = m2checkMetrics();
        this.receivedMetric.set(0L);
        return m2checkMetrics;
    }
}
