package io.reactivesocket.reactivestreams.extensions.internal.publishers;

import io.reactivesocket.reactivestreams.extensions.Px;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactivesocket/reactivestreams/extensions/internal/publishers/CachingPublisher.class */
public class CachingPublisher<T> implements Px<T> {
    private T cached;
    private Throwable error;
    private Publisher<T> source;
    private boolean subscribed;
    private CopyOnWriteArrayList<Subscriber<? super T>> subscribers = new CopyOnWriteArrayList<>();

    public CachingPublisher(Publisher<T> publisher) {
        this.source = publisher;
    }

    public void subscribe(final Subscriber<? super T> subscriber) {
        synchronized (this) {
            if (this.cached == null && this.error == null) {
                subscriber.onSubscribe(new Subscription() { // from class: io.reactivesocket.reactivestreams.extensions.internal.publishers.CachingPublisher.2
                    boolean cancelled;

                    public synchronized void request(long j) {
                        if (j <= 1 || this.cancelled) {
                            return;
                        }
                        if (CachingPublisher.this.cached == null) {
                            subscriber.onError(CachingPublisher.this.error);
                        } else {
                            subscriber.onNext(CachingPublisher.this.cached);
                            subscriber.onComplete();
                        }
                    }

                    public synchronized void cancel() {
                        this.cancelled = true;
                    }
                });
            } else {
                this.subscribers.add(subscriber);
                if (!this.subscribed) {
                    this.subscribed = true;
                    this.source.subscribe(new Subscriber<T>() { // from class: io.reactivesocket.reactivestreams.extensions.internal.publishers.CachingPublisher.1
                        public void onSubscribe(Subscription subscription) {
                            subscription.request(1L);
                        }

                        public void onNext(T t) {
                            synchronized (CachingPublisher.this) {
                                CachingPublisher.this.cached = t;
                                complete();
                            }
                        }

                        public void onError(Throwable th) {
                            synchronized (CachingPublisher.this) {
                                CachingPublisher.this.error = th;
                                complete();
                            }
                        }

                        void complete() {
                            Iterator it = CachingPublisher.this.subscribers.iterator();
                            while (it.hasNext()) {
                                Subscriber subscriber2 = (Subscriber) it.next();
                                if (CachingPublisher.this.error != null) {
                                    subscriber2.onError(CachingPublisher.this.error);
                                } else {
                                    subscriber2.onNext(CachingPublisher.this.cached);
                                    subscriber2.onComplete();
                                }
                            }
                        }

                        public void onComplete() {
                            synchronized (CachingPublisher.this) {
                                if (CachingPublisher.this.error == null && CachingPublisher.this.cached == null) {
                                    subscriber.onComplete();
                                }
                            }
                        }
                    });
                }
            }
        }
    }
}
