package org.curioswitch.gcloud.pubsub;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import com.google.auto.factory.AutoFactory;
import com.google.auto.factory.Provided;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import com.linecorp.armeria.client.ClientOptionValue;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.grpc.GrpcClientOptions;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.curioswitch.common.helpers.immutables.CurioStyle;
import org.curioswitch.gcloud.pubsub.ImmutableSubscriberOptions;
import org.immutables.value.Value;

@AutoFactory(implementing = {Factory.class})
/* loaded from: input_file:org/curioswitch/gcloud/pubsub/Subscriber.class */
public class Subscriber implements Closeable, StreamObserver<StreamingPullResponse> {
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
    private final SubscriberGrpc.SubscriberStub stub;
    private final SubscriberOptions options;
    private final Counter receivedMessages;
    private final Counter ackedMessages;
    private final Counter nackedMessages;
    private final Timer messageProcessingTime;
    private final Tracer tracer;
    private final TraceContext.Extractor<PubsubMessage> traceExtractor;
    private Duration streamReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;

    @Nullable
    private volatile StreamObserver<StreamingPullRequest> requestObserver;

    @Nullable
    private volatile RequestContext ctx;
    private volatile boolean closed;

    /* loaded from: input_file:org/curioswitch/gcloud/pubsub/Subscriber$Factory.class */
    public interface Factory {
        Subscriber create(SubscriberOptions subscriberOptions);
    }

    @CurioStyle
    @Value.Immutable
    /* loaded from: input_file:org/curioswitch/gcloud/pubsub/Subscriber$SubscriberOptions.class */
    public interface SubscriberOptions {

        /* loaded from: input_file:org/curioswitch/gcloud/pubsub/Subscriber$SubscriberOptions$Builder.class */
        public static class Builder extends ImmutableSubscriberOptions.Builder {
            @Override // org.curioswitch.gcloud.pubsub.ImmutableSubscriberOptions.Builder
            public /* bridge */ /* synthetic */ SubscriberOptions build() {
                return super.build();
            }
        }

        String getSubscription();

        MessageReceiver getMessageReceiver();

        default boolean getUnsafeWrapBuffers() {
            return false;
        }
    }

    public static SubscriberOptions.Builder newOptions(String str, MessageReceiver messageReceiver) {
        return new SubscriberOptions.Builder().subscription(str).messageReceiver(messageReceiver);
    }

    public static SubscriberOptions.Builder newOptions(ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) {
        return new SubscriberOptions.Builder().subscription(projectSubscriptionName.toString()).messageReceiver(messageReceiver);
    }

    public Subscriber(@Provided SubscriberGrpc.SubscriberStub subscriberStub, @Provided Optional<MeterRegistry> optional, @Provided Tracing tracing, SubscriberOptions subscriberOptions) {
        this.stub = subscriberOptions.getUnsafeWrapBuffers() ? (SubscriberGrpc.SubscriberStub) Clients.newDerivedClient(subscriberStub, new ClientOptionValue[]{(ClientOptionValue) GrpcClientOptions.UNSAFE_WRAP_RESPONSE_BUFFERS.newValue(true)}) : subscriberStub;
        this.options = subscriberOptions;
        MeterRegistry orElse = optional.orElse(NoopMeterRegistry.get());
        ImmutableList of = ImmutableList.of(Tag.of("subscription", subscriberOptions.getSubscription()));
        this.receivedMessages = orElse.counter("subscriber-received-messages", of);
        this.ackedMessages = orElse.counter("subscriber-acked-messages", of);
        this.nackedMessages = orElse.counter("subscriber-nacked-messages", of);
        orElse.gauge("reconnect-backoff-millis", of, this.streamReconnectBackoff, (v0) -> {
            return v0.toMillis();
        });
        this.messageProcessingTime = MoreMeters.newTimer(orElse, "subscriber-message-processing-time", of);
        this.tracer = tracing.tracer();
        this.traceExtractor = tracing.propagation().extractor((pubsubMessage, str) -> {
            return pubsubMessage.getAttributesOrDefault(str, (String) null);
        });
    }

    public void start() {
        open();
    }

    public void onNext(final StreamingPullResponse streamingPullResponse) {
        if (this.ctx == null) {
            this.ctx = RequestContext.current();
        }
        this.streamReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
        this.receivedMessages.increment(streamingPullResponse.getReceivedMessagesCount());
        final AtomicInteger atomicInteger = new AtomicInteger(streamingPullResponse.getReceivedMessagesCount());
        for (final ReceivedMessage receivedMessage : streamingPullResponse.getReceivedMessagesList()) {
            TraceContextOrSamplingFlags extract = this.traceExtractor.extract(receivedMessage.getMessage());
            (extract.context() != null ? this.tracer.joinSpan(extract.context()) : this.tracer.newTrace()).kind(Span.Kind.SERVER).name("google.pubsub.v1.Publisher.Publish").tag("subscription", this.options.getSubscription()).start(Timestamps.toMicros(receivedMessage.getMessage().getPublishTime())).finish();
            final StreamObserver<StreamingPullRequest> streamObserver = this.requestObserver;
            final long nanoTime = System.nanoTime();
            this.options.getMessageReceiver().receiveMessage(receivedMessage.getMessage(), new AckReplyConsumer() { // from class: org.curioswitch.gcloud.pubsub.Subscriber.1
                public void ack() {
                    releaseAndRecord();
                    Subscriber.this.ackedMessages.increment();
                    Preconditions.checkNotNull(streamObserver, "onNext called before start()");
                    streamObserver.onNext(StreamingPullRequest.newBuilder().addAckIds(receivedMessage.getAckId()).build());
                }

                public void nack() {
                    releaseAndRecord();
                    Subscriber.this.nackedMessages.increment();
                    Preconditions.checkNotNull(streamObserver, "onNext called before start()");
                    streamObserver.onNext(StreamingPullRequest.newBuilder().addModifyDeadlineAckIds(receivedMessage.getAckId()).addModifyDeadlineSeconds(0).build());
                }

                private void releaseAndRecord() {
                    if (Subscriber.this.options.getUnsafeWrapBuffers() && atomicInteger.decrementAndGet() == 0) {
                        GrpcUnsafeBufferUtil.releaseBuffer(streamingPullResponse, Subscriber.this.ctx);
                    }
                    Subscriber.this.messageProcessingTime.record(Duration.ofNanos(System.nanoTime() - nanoTime));
                }
            });
        }
    }

    public void onError(Throwable th) {
        if (this.closed || !StatusUtil.isRetryable(th)) {
            return;
        }
        Duration duration = this.streamReconnectBackoff;
        this.streamReconnectBackoff = this.streamReconnectBackoff.multipliedBy(2L);
        if (this.streamReconnectBackoff.compareTo(MAX_CHANNEL_RECONNECT_BACKOFF) > 0) {
            this.streamReconnectBackoff = MAX_CHANNEL_RECONNECT_BACKOFF;
        }
        RequestContext.current().eventLoop().schedule(this::open, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public void onCompleted() {
        if (this.closed) {
            return;
        }
        this.streamReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
        open();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Preconditions.checkNotNull(this.requestObserver, "close called before start.");
        this.closed = true;
        this.requestObserver.onCompleted();
    }

    private void open() {
        this.ctx = null;
        this.requestObserver = this.stub.streamingPull(this);
        this.requestObserver.onNext(StreamingPullRequest.newBuilder().setSubscription(this.options.getSubscription()).setStreamAckDeadlineSeconds(60).build());
    }
}
