package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/DynamicConnection.class */
public class DynamicConnection<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicConnection.class);
    private Observable<Endpoint> changeEndpointObservable;
    private PublishSubject<Observable<T>> subject = PublishSubject.create();
    private Func1<Endpoint, Observable<T>> toObservableFunc;

    DynamicConnection(Func1<Endpoint, Observable<T>> func1, Observable<Endpoint> observable) {
        this.changeEndpointObservable = observable;
        this.toObservableFunc = func1;
    }

    public static <K, V> DynamicConnection<GroupedObservable<K, V>> create(final ConnectToGroupedObservable.Builder<K, V> builder, Observable<Endpoint> observable) {
        return new DynamicConnection<>(new Func1<Endpoint, Observable<GroupedObservable<K, V>>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.1
            public Observable<GroupedObservable<K, V>> call(Endpoint endpoint) {
                ConnectToGroupedObservable.Builder builder2 = new ConnectToGroupedObservable.Builder(ConnectToGroupedObservable.Builder.this);
                builder2.host(endpoint.getHost()).port(endpoint.getPort()).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(builder2.build()).getObservable();
            }
        }, observable);
    }

    public static <T> DynamicConnection<T> create(final ConnectToObservable.Builder<T> builder, Observable<Endpoint> observable) {
        return new DynamicConnection<>(new Func1<Endpoint, Observable<T>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.2
            public Observable<T> call(Endpoint endpoint) {
                ConnectToObservable.Builder builder2 = new ConnectToObservable.Builder(ConnectToObservable.Builder.this);
                builder2.host(endpoint.getHost()).port(endpoint.getPort()).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(builder2.build()).getObservable();
            }
        }, observable);
    }

    public void close() {
        this.subject.onCompleted();
    }

    public Observable<T> observable() {
        return Observable.create(new Observable.OnSubscribe<T>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.3
            public void call(final Subscriber<? super T> subscriber) {
                subscriber.add(DynamicConnection.this.subject.flatMap(new Func1<Observable<T>, Observable<T>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.3.2
                    public Observable<T> call(Observable<T> observable) {
                        return observable;
                    }
                }).subscribe(new Observer<T>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.3.1
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    public void onError(Throwable th) {
                        subscriber.onError(th);
                    }

                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                }));
                subscriber.add(DynamicConnection.this.changeEndpointObservable.subscribe(new Action1<Endpoint>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnection.3.3
                    public void call(Endpoint endpoint) {
                        DynamicConnection.logger.debug("New endpoint: " + endpoint);
                        DynamicConnection.this.subject.onNext(DynamicConnection.this.toObservableFunc.call(endpoint));
                    }
                }));
            }
        });
    }
}
