package io.quarkus.grpc.stubs;

import io.grpc.stub.StreamObserver;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/* loaded from: input_file:io/quarkus/grpc/stubs/ManyToManyObserver.class */
public class ManyToManyObserver<I, O> extends AbstractMulti<O> implements StreamObserver<O> {
    private final StreamObserver<I> processor;
    private final Multi<I> source;
    private final ManyToManyObserver<I, O>.UpstreamSubscriber subscriber = new UpstreamSubscriber();
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference<>();
    private volatile MultiSubscriber<? super O> downstream;

    /* loaded from: input_file:io/quarkus/grpc/stubs/ManyToManyObserver$UpstreamSubscriber.class */
    class UpstreamSubscriber implements Flow.Subscriber<I>, Flow.Subscription {
        UpstreamSubscriber() {
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            if (!ManyToManyObserver.this.upstream.compareAndSet(null, subscription)) {
                subscription.cancel();
            } else {
                ManyToManyObserver.this.downstream.onSubscribe(this);
                subscription.request(Long.MAX_VALUE);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(I i) {
            ManyToManyObserver.this.processor.onNext(i);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            ManyToManyObserver.this.processor.onError(th);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            ManyToManyObserver.this.processor.onCompleted();
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            ManyToManyObserver.this.cancelUpstream();
        }
    }

    public ManyToManyObserver(Multi<I> multi, Function<StreamObserver<O>, StreamObserver<I>> function) {
        this.processor = function.apply(this);
        this.source = multi;
    }

    @Override // io.smallrye.mutiny.operators.AbstractMulti
    public void subscribe(MultiSubscriber<? super O> multiSubscriber) {
        this.downstream = multiSubscriber;
        this.source.subscribe(this.subscriber);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(O o) {
        this.downstream.onItem(o);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        cancelUpstream();
        this.downstream.onFailure(th);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
        cancelUpstream();
        this.downstream.onComplete();
    }

    private void cancelUpstream() {
        Flow.Subscription andSet = this.upstream.getAndSet(Subscriptions.CANCELLED);
        if (andSet != null) {
            andSet.cancel();
        }
    }
}
