package io.mantisrx.runtime.source.http.impl;

import com.mantisrx.common.utils.MantisMetricStringConstants;
import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.runtime.source.http.ClientResumePolicy;
import io.mantisrx.runtime.source.http.HttpClientFactory;
import io.mantisrx.runtime.source.http.HttpRequestFactory;
import io.mantisrx.runtime.source.http.HttpServerProvider;
import io.mantisrx.server.core.ServiceRegistry;
import io.netty.util.ReferenceCountUtil;
import io.reactivx.mantis.operators.DropOperator;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import mantis.io.reactivex.netty.client.RxClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl.class */
public class HttpSourceImpl<R, E, T> implements Source<T> {
    private static final String DEFAULT_BUFFER_SIZE = "0";
    private static Logger logger = LoggerFactory.getLogger(HttpSourceImpl.class);
    private final HttpRequestFactory<R> requestFactory;
    private final HttpServerProvider serverProvider;
    private final HttpClientFactory<R, E> clientFactory;
    private final Observer<HttpSourceEvent> observer;
    private final Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor;
    private final ClientResumePolicy<R, E> resumePolicy;
    private final PublishSubject<RxClient.ServerInfo> serversToRemove;
    private final Gauge connectionGauge;
    private final Gauge retryListGauge;
    private final Gauge connectionAttemptedGauge;
    private final Counter connectionEstablishedCounter;
    private final Counter connectionUnsubscribedCounter;
    private final Counter sourceCompletedCounter;
    private final Counter subscriptionEndedCounter;
    private final Counter subscriptionEstablishedCounter;
    private final Counter subscriptionFailedCounter;
    private final Counter serverFoundCounter;
    private final Counter subscriptionCancelledCounter;
    private final Counter dropped;
    private final Metrics incomingDataMetrics;
    private final ConnectionManager<E> connectionManager = new ConnectionManager<>();
    private final int bufferSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.mantisrx.runtime.source.http.impl.HttpSourceImpl$5, reason: invalid class name */
    /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$5.class */
    public class AnonymousClass5 implements Func1<ServerContext<HttpClientResponse<E>>, Observable<T>> {
        AnonymousClass5() {
        }

        public Observable<T> call(final ServerContext<HttpClientResponse<E>> serverContext) {
            HttpClientResponse<E> value = serverContext.getValue();
            RxClient.ServerInfo server = serverContext.getServer();
            HttpSourceEvent.EventType.SUBSCRIPTION_ESTABLISHED.newEvent(HttpSourceImpl.this.observer, server);
            HttpSourceImpl.this.subscriptionEstablishedCounter.increment();
            HttpSourceImpl.this.connectionManager.serverConnected(server, serverContext.getValue());
            HttpSourceImpl.this.connectionGauge.set(HttpSourceImpl.this.getConnectedServers().size());
            return HttpSourceImpl.this.streamResponseContent(server, value).map(new Func1<E, T>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.5.2
                public T call(E e) {
                    ReferenceCountUtil.retain(e);
                    return (T) HttpSourceImpl.this.postProcessor.call(serverContext, e);
                }
            }).lift(new DropOperator(HttpSourceImpl.this.incomingDataMetrics)).lift(new Observable.Operator<T, T>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.5.1
                public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
                    subscriber.add(Subscriptions.create(new Action0() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.5.1.1
                        public void call() {
                            HttpSourceEvent.EventType.SUBSCRIPTION_ENDED.newEvent(HttpSourceImpl.this.observer, serverContext.getServer());
                            HttpSourceImpl.this.subscriptionEndedCounter.increment();
                        }
                    }));
                    return subscriber;
                }
            });
        }
    }

    /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$Builder.class */
    public static class Builder<R, E, T> {
        public static final HttpServerProvider EMPTY_HTTP_SERVER_PROVIDER = new HttpServerProvider() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.Builder.1
            @Override // io.mantisrx.runtime.source.http.HttpServerProvider
            public Observable<RxClient.ServerInfo> getServersToAdd() {
                return Observable.empty();
            }

            @Override // io.mantisrx.runtime.source.http.HttpServerProvider
            public Observable<RxClient.ServerInfo> getServersToRemove() {
                return Observable.empty();
            }
        };
        private HttpRequestFactory<R> requestFactory;
        private HttpServerProvider serverProvider;
        private HttpClientFactory<R, E> httpClientFactory;
        private Observer<HttpSourceEvent> observer;
        private Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor;
        private ClientResumePolicy<R, E> clientResumePolicy;

        public Builder(HttpClientFactory<R, E> httpClientFactory, HttpRequestFactory<R> httpRequestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> func2) {
            this.requestFactory = httpRequestFactory;
            this.httpClientFactory = httpClientFactory;
            this.serverProvider = EMPTY_HTTP_SERVER_PROVIDER;
            this.postProcessor = func2;
            this.clientResumePolicy = new ClientResumePolicy<R, E>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.Builder.2
                @Override // io.mantisrx.runtime.source.http.ClientResumePolicy
                public Observable<HttpClientResponse<E>> onError(ServerClientContext<R, E> serverClientContext, int i, Throwable th) {
                    return null;
                }

                @Override // io.mantisrx.runtime.source.http.ClientResumePolicy
                public Observable<HttpClientResponse<E>> onCompleted(ServerClientContext<R, E> serverClientContext, int i) {
                    return null;
                }
            };
            this.observer = PublishSubject.create();
        }

        public Builder(HttpClientFactory<R, E> httpClientFactory, HttpRequestFactory<R> httpRequestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> func2, ClientResumePolicy<R, E> clientResumePolicy) {
            this.requestFactory = httpRequestFactory;
            this.httpClientFactory = httpClientFactory;
            this.serverProvider = EMPTY_HTTP_SERVER_PROVIDER;
            this.postProcessor = func2;
            this.clientResumePolicy = clientResumePolicy;
            this.observer = PublishSubject.create();
        }

        public Builder<R, E, T> withServerProvider(HttpServerProvider httpServerProvider) {
            this.serverProvider = httpServerProvider;
            return this;
        }

        public Builder<R, E, T> withActivityObserver(Observer<HttpSourceEvent> observer) {
            this.observer = observer;
            return this;
        }

        public Builder<R, E, T> resumeWith(ClientResumePolicy<R, E> clientResumePolicy) {
            this.clientResumePolicy = clientResumePolicy;
            return this;
        }

        public HttpSourceImpl<R, E, T> build() {
            return new HttpSourceImpl<>(this.requestFactory, this.serverProvider, this.httpClientFactory, this.observer, this.postProcessor, this.clientResumePolicy);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$ConnectionManager.class */
    public static class ConnectionManager<E> {
        private final ConcurrentMap<RxClient.ServerInfo, HttpClientResponse<E>> connectedServers;
        private final Set<RxClient.ServerInfo> retryServers;
        private final Set<RxClient.ServerInfo> connectionAttempted;

        private ConnectionManager() {
            this.connectedServers = new ConcurrentHashMap();
            this.retryServers = new CopyOnWriteArraySet();
            this.connectionAttempted = new CopyOnWriteArraySet();
        }

        public void serverConnected(RxClient.ServerInfo serverInfo, HttpClientResponse<E> httpClientResponse) {
            this.connectedServers.put(serverInfo, httpClientResponse);
            this.retryServers.remove(serverInfo);
            this.connectionAttempted.remove(serverInfo);
            HttpSourceImpl.logger.info("CM: Server connected: " + serverInfo + " count " + this.connectedServers.size());
        }

        public void serverConnectionAttempted(RxClient.ServerInfo serverInfo) {
            this.connectionAttempted.add(serverInfo);
        }

        public boolean alreadyConnected(RxClient.ServerInfo serverInfo) {
            return this.connectedServers.containsKey(serverInfo);
        }

        public boolean connectionAlreadyAttempted(RxClient.ServerInfo serverInfo) {
            return this.connectionAttempted.contains(serverInfo);
        }

        public void serverDisconnected(RxClient.ServerInfo serverInfo) {
            this.connectedServers.remove(serverInfo);
            this.connectionAttempted.remove(serverInfo);
            this.retryServers.add(serverInfo);
            HttpSourceImpl.logger.info("CM: Server disconnected: " + serverInfo + " count " + this.connectedServers.size());
        }

        public void serverRemoved(RxClient.ServerInfo serverInfo) {
            this.connectedServers.remove(serverInfo);
            this.connectionAttempted.remove(serverInfo);
            this.retryServers.remove(serverInfo);
            HttpSourceImpl.logger.info("CM: Server removed: " + serverInfo + " count " + this.connectedServers.size());
        }

        public Set<RxClient.ServerInfo> getConnectedServers() {
            return Collections.unmodifiableSet(this.connectedServers.keySet());
        }

        public Set<RxClient.ServerInfo> getRetryServers() {
            return Collections.unmodifiableSet(this.retryServers);
        }

        public Set<RxClient.ServerInfo> getConnectionAttemptedServers() {
            return Collections.unmodifiableSet(this.connectionAttempted);
        }

        public void reset() {
            this.connectedServers.clear();
            this.connectionAttempted.clear();
            this.retryServers.clear();
            HttpSourceImpl.logger.info("CM: reset");
        }
    }

    /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$HttpSourceEvent.class */
    public static class HttpSourceEvent {
        private final RxClient.ServerInfo server;
        private final EventType eventType;

        /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$HttpSourceEvent$EventType.class */
        public enum EventType {
            SERVER_FOUND,
            CONNECTION_ATTEMPTED,
            CONNECTION_ESTABLISHED,
            CONNECTION_UNSUBSCRIBED,
            SOURCE_COMPLETED,
            SUBSCRIPTION_ESTABLISHED,
            SUBSCRIPTION_FAILED,
            SUBSCRIPTION_CANCELED,
            SUBSCRIPTION_ENDED;

            public HttpSourceEvent newEvent(Observer<HttpSourceEvent> observer, RxClient.ServerInfo serverInfo) {
                HttpSourceEvent httpSourceEvent = new HttpSourceEvent(serverInfo, this);
                observer.onNext(httpSourceEvent);
                return httpSourceEvent;
            }
        }

        private HttpSourceEvent(RxClient.ServerInfo serverInfo, EventType eventType) {
            this.server = serverInfo;
            this.eventType = eventType;
        }

        public RxClient.ServerInfo getServer() {
            return this.server;
        }

        public EventType getEventType() {
            return this.eventType;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            HttpSourceEvent httpSourceEvent = (HttpSourceEvent) obj;
            if (this.eventType != httpSourceEvent.eventType) {
                return false;
            }
            return this.server != null ? this.server.equals(httpSourceEvent.server) : httpSourceEvent.server == null;
        }

        public int hashCode() {
            return (31 * (this.server != null ? this.server.hashCode() : 0)) + (this.eventType != null ? this.eventType.hashCode() : 0);
        }
    }

    /* loaded from: input_file:io/mantisrx/runtime/source/http/impl/HttpSourceImpl$UnsubscriptionLoggingOperator.class */
    private static class UnsubscriptionLoggingOperator<R> implements Observable.Operator<R, R> {
        private final String streamDesciption;

        public UnsubscriptionLoggingOperator(String str) {
            this.streamDesciption = str;
        }

        public static <R> UnsubscriptionLoggingOperator<R> create(String str) {
            return new UnsubscriptionLoggingOperator<>(str);
        }

        public Subscriber<? super R> call(Subscriber<? super R> subscriber) {
            subscriber.add(Subscriptions.create(new Action0() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.UnsubscriptionLoggingOperator.1
                public void call() {
                    HttpSourceImpl.logger.debug("{} unsubscribed", UnsubscriptionLoggingOperator.this.streamDesciption);
                }
            }));
            return subscriber;
        }
    }

    HttpSourceImpl(HttpRequestFactory<R> httpRequestFactory, HttpServerProvider httpServerProvider, HttpClientFactory<R, E> httpClientFactory, Observer<HttpSourceEvent> observer, Func2<ServerContext<HttpClientResponse<E>>, E, T> func2, ClientResumePolicy<R, E> clientResumePolicy) {
        this.requestFactory = httpRequestFactory;
        this.serverProvider = httpServerProvider;
        this.clientFactory = httpClientFactory;
        this.observer = observer;
        this.postProcessor = func2;
        this.resumePolicy = clientResumePolicy;
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().name(HttpSourceImpl.class.getCanonicalName()).addGauge("connectionGauge").addGauge("retryListGauge").addGauge("connectionAttemptedGauge").addCounter("connectionEstablishedCounter").addCounter("connectionUnsubscribedCounter").addCounter("sourceCompletedCounter").addCounter("subscriptionEndedCounter").addCounter("subscriptionEstablishedCounter").addCounter("subscriptionFailedCounter").addCounter("serverFoundCounter").addCounter("subscriptionCancelledCounter").build());
        this.connectionGauge = registerAndGet.getGauge("connectionGauge");
        this.retryListGauge = registerAndGet.getGauge("retryListGauge");
        this.connectionAttemptedGauge = registerAndGet.getGauge("connectionAttemptedGauge");
        this.connectionEstablishedCounter = registerAndGet.getCounter("connectionEstablishedCounter");
        this.connectionUnsubscribedCounter = registerAndGet.getCounter("connectionUnsubscribedCounter");
        this.sourceCompletedCounter = registerAndGet.getCounter("sourceCompletedCounter");
        this.subscriptionEndedCounter = registerAndGet.getCounter("subscriptionEndedCounter");
        this.subscriptionEstablishedCounter = registerAndGet.getCounter("subscriptionEstablishedCounter");
        this.subscriptionFailedCounter = registerAndGet.getCounter("subscriptionFailedCounter");
        this.serverFoundCounter = registerAndGet.getCounter("serverFoundCounter");
        this.subscriptionCancelledCounter = registerAndGet.getCounter("subscriptionCancelledCounter");
        this.incomingDataMetrics = new Metrics.Builder().name(MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP + "_HttpSourceImpl").addCounter("onNext").addCounter("onError").addCounter("onComplete").addGauge("subscribe").addCounter("dropped").addGauge("requested").addGauge("bufferedGauge").build();
        MetricsRegistry.getInstance().registerAndGet(this.incomingDataMetrics);
        this.dropped = this.incomingDataMetrics.getCounter("dropped");
        this.bufferSize = Integer.parseInt(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("httpSource.buffer.size", DEFAULT_BUFFER_SIZE));
        this.serversToRemove = PublishSubject.create();
        httpServerProvider.getServersToRemove().subscribe(new Action1<RxClient.ServerInfo>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.1
            public void call(RxClient.ServerInfo serverInfo) {
                HttpSourceImpl.this.serversToRemove.onNext(serverInfo);
            }
        });
    }

    public static <R, E, T> Builder<R, E, T> builder(HttpClientFactory<R, E> httpClientFactory, HttpRequestFactory<R> httpRequestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> func2) {
        return new Builder<>(httpClientFactory, httpRequestFactory, func2);
    }

    public static <R, E, T> Builder<R, E, T> builder(HttpClientFactory<R, E> httpClientFactory, HttpRequestFactory<R> httpRequestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> func2, ClientResumePolicy<R, E> clientResumePolicy) {
        return new Builder<>(httpClientFactory, httpRequestFactory, func2, clientResumePolicy);
    }

    public static <E> Func2<ServerContext<HttpClientResponse<E>>, E, ServerContext<E>> contextWrapper() {
        return new Func2<ServerContext<HttpClientResponse<E>>, E, ServerContext<E>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.2
            public ServerContext<E> call(ServerContext<HttpClientResponse<E>> serverContext, E e) {
                return new ServerContext<>(serverContext.getServer(), e);
            }

            public /* bridge */ /* synthetic */ Object call(Object obj, Object obj2) {
                return call((ServerContext<HttpClientResponse<ServerContext<HttpClientResponse<E>>>>) obj, (ServerContext<HttpClientResponse<E>>) obj2);
            }
        };
    }

    public static <E> Func2<ServerContext<HttpClientResponse<E>>, E, E> identityConverter() {
        return new Func2<ServerContext<HttpClientResponse<E>>, E, E>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.3
            public E call(ServerContext<HttpClientResponse<E>> serverContext, E e) {
                return e;
            }

            public /* bridge */ /* synthetic */ Object call(Object obj, Object obj2) {
                return call((ServerContext<HttpClientResponse<ServerContext<HttpClientResponse<E>>>>) obj, (ServerContext<HttpClientResponse<E>>) obj2);
            }
        };
    }

    public Observable<Observable<T>> call(Context context, Index index) {
        return this.serverProvider.getServersToAdd().filter(serverInfo -> {
            return Boolean.valueOf((this.connectionManager.alreadyConnected(serverInfo) || this.connectionManager.connectionAlreadyAttempted(serverInfo)) ? false : true);
        }).flatMap(serverInfo2 -> {
            return streamServers(Observable.just(serverInfo2));
        }).doOnError(th -> {
            logger.error(String.format("The source encountered an error " + th.getMessage(), th));
            this.observer.onError(th);
        }).doAfterTerminate(() -> {
            this.observer.onCompleted();
            this.connectionManager.reset();
        }).lift(new Observable.Operator<Observable<T>, Observable<T>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.4
            public Subscriber<? super Observable<T>> call(Subscriber<? super Observable<T>> subscriber) {
                subscriber.add(Subscriptions.create(new Action0() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.4.1
                    public void call() {
                        HttpSourceImpl.this.connectionManager.reset();
                    }
                }));
                return subscriber;
            }
        });
    }

    private Observable<Observable<T>> streamServers(Observable<RxClient.ServerInfo> observable) {
        return observable.map(serverInfo -> {
            HttpSourceEvent.EventType.SERVER_FOUND.newEvent(this.observer, serverInfo);
            this.serverFoundCounter.increment();
            return new ServerClientContext(serverInfo, this.clientFactory.createClient(serverInfo), this.requestFactory, this.observer);
        }).flatMap(serverClientContext -> {
            return streamResponseUntilServerIsRemoved(serverClientContext).map(new Func1<HttpClientResponse<E>, ServerContext<HttpClientResponse<E>>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.7
                public ServerContext<HttpClientResponse<E>> call(HttpClientResponse<E> httpClientResponse) {
                    HttpSourceEvent.EventType.CONNECTION_ESTABLISHED.newEvent(HttpSourceImpl.this.observer, serverClientContext.getServer());
                    HttpSourceImpl.this.connectionEstablishedCounter.increment();
                    return new ServerContext<>(serverClientContext.getServer(), httpClientResponse);
                }
            }).lift(new Observable.Operator<ServerContext<HttpClientResponse<E>>, ServerContext<HttpClientResponse<E>>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.6
                public Subscriber<? super ServerContext<HttpClientResponse<E>>> call(Subscriber<? super ServerContext<HttpClientResponse<E>>> subscriber) {
                    subscriber.add(Subscriptions.create(new Action0() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.6.1
                        public void call() {
                            HttpSourceImpl.this.connectionUnsubscribedCounter.increment();
                            HttpSourceEvent.EventType.CONNECTION_UNSUBSCRIBED.newEvent(HttpSourceImpl.this.observer, serverClientContext.getServer());
                        }
                    }));
                    return subscriber;
                }
            });
        }).map(new AnonymousClass5());
    }

    private void checkResponseIsSuccessful(HttpClientResponse<E> httpClientResponse) {
        int code = httpClientResponse.getStatus().code();
        if (code != 200) {
            throw new RuntimeException(String.format("Expected 200 but got status %d and reason: %s", Integer.valueOf(code), httpClientResponse.getStatus().reasonPhrase()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<E> streamResponseContent(RxClient.ServerInfo serverInfo, HttpClientResponse<E> httpClientResponse) {
        return httpClientResponse.getContent().takeUntil(this.serversToRemove.filter(serverInfo2 -> {
            return Boolean.valueOf(serverInfo2 != null && serverInfo2.equals(serverInfo));
        })).doOnError(th -> {
            HttpSourceEvent.EventType.SUBSCRIPTION_FAILED.newEvent(this.observer, serverInfo);
            this.subscriptionFailedCounter.increment();
            this.retryListGauge.set(getRetryServers().size());
            logger.info("server disconnected onError1: " + serverInfo);
            this.connectionManager.serverDisconnected(serverInfo);
            this.connectionGauge.set(getConnectedServers().size());
        }).onErrorResumeNext(Observable.empty()).doOnCompleted(() -> {
            HttpSourceEvent.EventType.SOURCE_COMPLETED.newEvent(this.observer, serverInfo);
            this.sourceCompletedCounter.increment();
            logger.info("server disconnected onComplete1: " + serverInfo);
            this.connectionManager.serverDisconnected(serverInfo);
            this.retryListGauge.set(getRetryServers().size());
            this.connectionGauge.set(getConnectedServers().size());
        });
    }

    private Observable<HttpClientResponse<E>> streamResponseUntilServerIsRemoved(final ServerClientContext<R, E> serverClientContext) {
        return serverClientContext.newResponse(serverInfo -> {
            this.connectionAttemptedGauge.set(getConnectionAttemptedServers().size());
            this.connectionManager.serverConnectionAttempted(serverInfo);
        }).lift(new OperatorResumeOnError(new ResumeOnErrorPolicy<HttpClientResponse<E>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.8
            @Override // io.mantisrx.runtime.source.http.impl.ResumeOnErrorPolicy
            public Observable<HttpClientResponse<E>> call(Integer num, Throwable th) {
                return HttpSourceImpl.this.resumePolicy.onError(serverClientContext, num.intValue(), th);
            }
        })).lift(new OperatorResumeOnCompleted(new ResumeOnCompletedPolicy<HttpClientResponse<E>>() { // from class: io.mantisrx.runtime.source.http.impl.HttpSourceImpl.9
            @Override // io.mantisrx.runtime.source.http.impl.ResumeOnCompletedPolicy
            public Observable<HttpClientResponse<E>> call(Integer num) {
                return HttpSourceImpl.this.resumePolicy.onCompleted(serverClientContext, num.intValue());
            }
        })).takeUntil(this.serversToRemove.filter(serverInfo2 -> {
            boolean z = serverInfo2 != null && serverInfo2.equals(serverClientContext.getServer());
            if (z) {
                this.subscriptionCancelledCounter.increment();
                HttpSourceEvent.EventType.SUBSCRIPTION_CANCELED.newEvent(this.observer, serverInfo2);
            }
            return Boolean.valueOf(z);
        }).doOnNext(serverInfo3 -> {
            logger.info("server removed: " + serverInfo3);
            this.connectionManager.serverRemoved(serverInfo3);
            this.connectionGauge.set(getConnectedServers().size());
        })).doOnNext(httpClientResponse -> {
            checkResponseIsSuccessful(httpClientResponse);
        }).doOnError(th -> {
            logger.error(String.format("Connecting to server %s failed: %s", serverClientContext.getServer(), th.getMessage()), th);
            HttpSourceEvent.EventType.SUBSCRIPTION_FAILED.newEvent(this.observer, serverClientContext.getServer());
            this.subscriptionFailedCounter.increment();
            logger.info("server disconnected onError2: " + serverClientContext.getServer());
            this.connectionManager.serverDisconnected(serverClientContext.getServer());
            this.retryListGauge.set(getRetryServers().size());
            this.connectionGauge.set(this.connectionManager.getConnectedServers().size());
        }).doOnCompleted(() -> {
        }).onErrorResumeNext(Observable.empty());
    }

    Set<RxClient.ServerInfo> getConnectedServers() {
        return this.connectionManager.getConnectedServers();
    }

    Set<RxClient.ServerInfo> getRetryServers() {
        return this.connectionManager.getRetryServers();
    }

    Set<RxClient.ServerInfo> getConnectionAttemptedServers() {
        return this.connectionManager.getConnectionAttemptedServers();
    }

    static {
        NettyUtils.setNettyThreads();
    }
}
