package ratpack.exec.internal;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.stream.TransformablePublisher;
import ratpack.func.Action;
import ratpack.func.Block;
import ratpack.func.Exceptions;

/* loaded from: input_file:ratpack/exec/internal/ExecutionBoundPublisher.class */
public class ExecutionBoundPublisher<T> implements TransformablePublisher<T> {
    private final Publisher<T> publisher;
    private final Action<? super T> disposer;

    public ExecutionBoundPublisher(Publisher<T> publisher, Action<? super T> action) {
        this.publisher = publisher;
        this.disposer = action;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        DefaultExecution require = DefaultExecution.require();
        Objects.requireNonNull(subscriber);
        require.delimitStream(subscriber::onError, continuationStream -> {
            this.publisher.subscribe(new Subscriber<T>() { // from class: ratpack.exec.internal.ExecutionBoundPublisher.1
                private Subscription subscription;
                private final AtomicBoolean cancelled = new AtomicBoolean();
                private final AtomicBoolean pendingCancelSignal = new AtomicBoolean(true);

                /* JADX INFO: Access modifiers changed from: private */
                public boolean dispatch(Block block) {
                    if (this.cancelled.get()) {
                        return false;
                    }
                    if (!require.isBound()) {
                        return continuationStream.event(block);
                    }
                    try {
                        block.execute();
                        return true;
                    } catch (Exception e) {
                        throw Exceptions.uncheck(e);
                    }
                }

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    Subscriber subscriber2 = subscriber;
                    DefaultExecution defaultExecution = require;
                    ContinuationStream continuationStream = continuationStream;
                    dispatch(() -> {
                        subscriber2.onSubscribe(new Subscription() { // from class: ratpack.exec.internal.ExecutionBoundPublisher.1.1
                            public void request(long j) {
                                AnonymousClass1 anonymousClass1 = AnonymousClass1.this;
                                Subscription subscription2 = subscription;
                                anonymousClass1.dispatch(() -> {
                                    subscription2.request(j);
                                });
                            }

                            public void cancel() {
                                if (AnonymousClass1.this.cancelled.compareAndSet(false, true)) {
                                    if (defaultExecution.isBound()) {
                                        subscription.cancel();
                                        continuationStream.complete(Block.noop());
                                    } else {
                                        AnonymousClass1.this.pendingCancelSignal.set(true);
                                        ContinuationStream continuationStream2 = continuationStream;
                                        Subscription subscription2 = subscription;
                                        continuationStream2.complete(() -> {
                                            if (AnonymousClass1.this.pendingCancelSignal.compareAndSet(true, false)) {
                                                subscription2.cancel();
                                            }
                                        });
                                    }
                                }
                            }
                        });
                    });
                }

                public void onNext(T t) {
                    Subscriber subscriber2 = subscriber;
                    if (dispatch(() -> {
                        if (this.cancelled.get()) {
                            dispose(t);
                        } else {
                            subscriber2.onNext(t);
                        }
                    })) {
                        return;
                    }
                    dispose(t);
                    if (this.cancelled.get() && require.isBound() && this.pendingCancelSignal.compareAndSet(true, false)) {
                        this.subscription.cancel();
                        continuationStream.complete(Block.noop());
                    }
                }

                private void dispose(T t) {
                    try {
                        ExecutionBoundPublisher.this.disposer.execute(t);
                    } catch (Exception e) {
                        DefaultExecution.LOGGER.warn("Exception raised disposing stream item will be ignored - ", e);
                    }
                }

                public void onComplete() {
                    ContinuationStream continuationStream = continuationStream;
                    Subscriber subscriber2 = subscriber;
                    continuationStream.complete(() -> {
                        if (this.cancelled.get()) {
                            return;
                        }
                        subscriber2.onComplete();
                    });
                }

                public void onError(Throwable th) {
                    if (this.cancelled.get()) {
                        return;
                    }
                    ContinuationStream continuationStream = continuationStream;
                    Subscriber subscriber2 = subscriber;
                    continuationStream.complete(() -> {
                        subscriber2.onError(th);
                    });
                }
            });
        });
    }
}
