package monix.reactive.internal.builders;

import monix.execution.Ack;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.Scheduler;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import scala.Function1;
import scala.concurrent.Future;
import scala.util.control.NonFatal$;

/* compiled from: UnsafeCreateObservable.scala */
/* loaded from: input_file:monix/reactive/internal/builders/UnsafeCreateObservable.class */
public final class UnsafeCreateObservable<A> extends Observable<A> {
    private final Function1<Subscriber<A>, Cancelable> f;

    /* compiled from: UnsafeCreateObservable.scala */
    /* loaded from: input_file:monix/reactive/internal/builders/UnsafeCreateObservable$SafeSubscriber.class */
    private static final class SafeSubscriber<A> implements Subscriber<A> {
        private final Subscriber<A> underlying;
        private final Scheduler scheduler;
        private boolean isDone = false;

        public <A> SafeSubscriber(Subscriber<A> subscriber) {
            this.underlying = subscriber;
            this.scheduler = subscriber.scheduler();
        }

        @Override // monix.reactive.observers.Subscriber
        public Scheduler scheduler() {
            return this.scheduler;
        }

        @Override // monix.reactive.Observer
        /* renamed from: onNext */
        public Future<Ack> mo23onNext(A a) {
            Future<Ack> future;
            if (this.isDone) {
                return Ack$Stop$.MODULE$;
            }
            try {
                future = this.underlying.mo23onNext(a);
            } catch (Throwable th) {
                if (!NonFatal$.MODULE$.apply(th)) {
                    throw th;
                }
                onError(th);
                future = Ack$Stop$.MODULE$;
            }
            Future<Ack> future2 = future;
            if (future2 == Ack$Continue$.MODULE$) {
                return Ack$Continue$.MODULE$;
            }
            if (future2 != Ack$Stop$.MODULE$) {
                return future2;
            }
            this.isDone = true;
            return Ack$Stop$.MODULE$;
        }

        @Override // monix.reactive.Observer
        public void onError(Throwable th) {
            if (this.isDone) {
                return;
            }
            this.isDone = true;
            this.underlying.onError(th);
        }

        @Override // monix.reactive.Observer
        public void onComplete() {
            if (this.isDone) {
                return;
            }
            this.isDone = true;
            this.underlying.onComplete();
        }
    }

    public <A> UnsafeCreateObservable(Function1<Subscriber<A>, Cancelable> function1) {
        this.f = function1;
    }

    @Override // monix.reactive.Observable
    public Cancelable unsafeSubscribeFn(Subscriber<A> subscriber) {
        try {
            return (Cancelable) this.f.apply(new SafeSubscriber(subscriber));
        } catch (Throwable th) {
            if (!NonFatal$.MODULE$.apply(th)) {
                throw th;
            }
            subscriber.scheduler().reportFailure(th);
            return Cancelable$.MODULE$.empty();
        }
    }
}
