package ratpack.exec.stream.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.stream.TransformablePublisher;
import ratpack.func.Function;

/* loaded from: input_file:ratpack/exec/stream/internal/FlatMapPublisher.class */
public class FlatMapPublisher<O, I> implements TransformablePublisher<O> {
    private final Publisher<I> input;
    private final Function<? super I, ? extends Promise<? extends O>> function;

    public FlatMapPublisher(Publisher<I> publisher, Function<? super I, ? extends Promise<? extends O>> function) {
        this.input = publisher;
        this.function = function;
    }

    public void subscribe(final Subscriber<? super O> subscriber) {
        this.input.subscribe(new Subscriber<I>() { // from class: ratpack.exec.stream.internal.FlatMapPublisher.1
            private Subscription subscription;
            private final AtomicBoolean done = new AtomicBoolean();

            public void onSubscribe(final Subscription subscription) {
                this.subscription = new Subscription() { // from class: ratpack.exec.stream.internal.FlatMapPublisher.1.1
                    public void request(long j) {
                        subscription.request(j);
                    }

                    public void cancel() {
                        subscription.cancel();
                    }
                };
                subscriber.onSubscribe(this.subscription);
            }

            public void onNext(I i) {
                if (this.done.get()) {
                    return;
                }
                try {
                    Promise onError = ((Promise) FlatMapPublisher.this.function.apply(i)).onError(th -> {
                        this.subscription.cancel();
                        innerOnError(th);
                    });
                    Subscriber subscriber2 = subscriber;
                    onError.then(obj -> {
                        if (this.done.get()) {
                            return;
                        }
                        subscriber2.onNext(obj);
                    });
                } catch (Throwable th2) {
                    this.subscription.cancel();
                    innerOnError(th2);
                }
            }

            public void onError(Throwable th) {
                Promise.value(th).then(this::innerOnError);
            }

            public void innerOnError(Throwable th) {
                if (this.done.compareAndSet(false, true)) {
                    subscriber.onError(th);
                }
            }

            public void onComplete() {
                Operation noop = Operation.noop();
                Subscriber subscriber2 = subscriber;
                noop.then(() -> {
                    if (this.done.compareAndSet(false, true)) {
                        subscriber2.onComplete();
                    }
                });
            }
        });
    }
}
