package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.EndpointChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.BooleanSubscription;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/FixedConnectionSet.class */
public class FixedConnectionSet<T> {
    private static final Logger logger = LoggerFactory.getLogger(FixedConnectionSet.class);
    private EndpointInjector endpointInjector;
    private Func1<Endpoint, Observable<T>> toObservableFunc;
    private MergedObservable<T> mergedObservable;

    public FixedConnectionSet(int i, EndpointInjector endpointInjector, Func1<Endpoint, Observable<T>> func1) {
        this.endpointInjector = endpointInjector;
        this.toObservableFunc = func1;
        this.mergedObservable = MergedObservable.create(i);
    }

    public static <K, V> FixedConnectionSet<GroupedObservable<K, V>> create(int i, final ConnectToGroupedObservable.Builder<K, V> builder, EndpointInjector endpointInjector) {
        return new FixedConnectionSet<>(i, endpointInjector, new Func1<Endpoint, Observable<GroupedObservable<K, V>>>() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.1
            public Observable<GroupedObservable<K, V>> call(Endpoint endpoint) {
                ConnectToGroupedObservable.Builder.this.host(endpoint.getHost()).port(endpoint.getPort());
                return RemoteObservable.connect(ConnectToGroupedObservable.Builder.this.build()).getObservable();
            }
        });
    }

    public static <T> FixedConnectionSet<T> create(int i, final ConnectToObservable.Builder<T> builder, EndpointInjector endpointInjector) {
        return new FixedConnectionSet<>(i, endpointInjector, new Func1<Endpoint, Observable<T>>() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.2
            public Observable<T> call(Endpoint endpoint) {
                ConnectToObservable.Builder.this.host(endpoint.getHost()).port(endpoint.getPort());
                return RemoteObservable.connect(ConnectToObservable.Builder.this.build()).getObservable();
            }
        });
    }

    public Observable<Observable<T>> getObservables() {
        return Observable.create(new Observable.OnSubscribe<Observable<T>>() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.3
            public void call(final Subscriber<? super Observable<T>> subscriber) {
                final BooleanSubscription booleanSubscription = new BooleanSubscription();
                subscriber.add(new Subscription() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.3.1
                    public void unsubscribe() {
                        FixedConnectionSet.this.mergedObservable.clear();
                        booleanSubscription.unsubscribe();
                    }

                    public boolean isUnsubscribed() {
                        return booleanSubscription.isUnsubscribed();
                    }
                });
                subscriber.add(FixedConnectionSet.this.mergedObservable.get().subscribe(new Observer<Observable<T>>() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.3.2
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    public void onError(Throwable th) {
                        subscriber.onError(th);
                    }

                    public void onNext(Observable<T> observable) {
                        subscriber.onNext(observable);
                    }
                }));
                subscriber.add(FixedConnectionSet.this.endpointInjector.deltas().subscribe(new Action1<EndpointChange>() { // from class: io.reactivex.mantis.remote.observable.FixedConnectionSet.3.3
                    public void call(EndpointChange endpointChange) {
                        String uniqueHost = Endpoint.uniqueHost(endpointChange.getEndpoint().getHost(), endpointChange.getEndpoint().getPort(), endpointChange.getEndpoint().getSlotId());
                        if (EndpointChange.Type.add == endpointChange.getType()) {
                            FixedConnectionSet.logger.info("Adding new connection to host: " + endpointChange.getEndpoint().getHost() + " at port: " + endpointChange.getEndpoint().getPort() + " with id: " + uniqueHost);
                            FixedConnectionSet.this.mergedObservable.mergeIn(uniqueHost, (Observable) FixedConnectionSet.this.toObservableFunc.call(endpointChange.getEndpoint()), endpointChange.getEndpoint().getErrorCallback(), endpointChange.getEndpoint().getCompletedCallback());
                        } else if (EndpointChange.Type.complete == endpointChange.getType()) {
                            FixedConnectionSet.logger.info("Forcing connection to complete host: " + endpointChange.getEndpoint().getHost() + " at port: " + endpointChange.getEndpoint().getPort() + " with id: " + uniqueHost);
                            FixedConnectionSet.this.mergedObservable.forceComplete(uniqueHost);
                        }
                    }
                }));
            }
        });
    }
}
