package io.mantisrx.server.worker.client;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivx.mantis.operators.DropOperator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/mantisrx/server/worker/client/MetricsClientImpl.class */
public class MetricsClientImpl<T> implements MetricsClient<T> {
    private static final Logger logger = LoggerFactory.getLogger(MetricsClientImpl.class);
    final String jobId;
    final WorkerConnectionFunc<T> workerConnectionFunc;
    final JobWorkerMetricsLocator jobWorkerMetricsLocator;
    private final Gauge workersGauge;
    private final Gauge expectedWorkersGauge;
    private final Gauge workerConnReceivingDataGauge;
    private final Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver;
    private final long dataRecvTimeoutSecs;
    private final AtomicBoolean nowClosed = new AtomicBoolean(false);
    private final MetricsClientImpl<T>.WorkerConnections workerConnections = new WorkerConnections();
    private final String workersGuageName = "MetricsConnections";
    private final String expectedWorkersGaugeName = "ExpectedMetricsConnections";
    private final String workerConnReceivingDataGaugeName = "metricsRecvngData";
    private final AtomicInteger numWorkers = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/server/worker/client/MetricsClientImpl$WorkerConnections.class */
    public class WorkerConnections {
        private final Map<String, WorkerConnection<T>> workerConnections = new HashMap();
        private boolean isClosed = false;

        WorkerConnections() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void put(String str, WorkerConnection<T> workerConnection) {
            synchronized (this.workerConnections) {
                if (this.isClosed) {
                    return;
                }
                this.workerConnections.put(str, workerConnection);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public WorkerConnection<T> remove(String str) {
            WorkerConnection<T> remove;
            synchronized (this.workerConnections) {
                remove = this.workerConnections.remove(str);
            }
            return remove;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeOut(Action1<WorkerConnection<T>> action1) {
            synchronized (this.workerConnections) {
                this.isClosed = true;
            }
            for (WorkerConnection<T> workerConnection : this.workerConnections.values()) {
                MetricsClientImpl.logger.info("Closing " + workerConnection.getName());
                action1.call(workerConnection);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsClientImpl(String str, WorkerConnectionFunc<T> workerConnectionFunc, JobWorkerMetricsLocator jobWorkerMetricsLocator, Observable<Integer> observable, Observer<WorkerConnectionsStatus> observer, long j) {
        this.jobId = str;
        this.workerConnectionFunc = workerConnectionFunc;
        this.jobWorkerMetricsLocator = jobWorkerMetricsLocator;
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().name(MetricsClientImpl.class.getCanonicalName() + "-" + str).addGauge("MetricsConnections").addGauge("ExpectedMetricsConnections").addGauge("metricsRecvngData").build());
        this.workersGauge = registerAndGet.getGauge("MetricsConnections");
        this.expectedWorkersGauge = registerAndGet.getGauge("ExpectedMetricsConnections");
        this.workerConnReceivingDataGauge = registerAndGet.getGauge("metricsRecvngData");
        observable.doOnNext(new Action1<Integer>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.2
            public void call(Integer num) {
                MetricsClientImpl.this.numWorkers.set(num.intValue());
            }
        }).takeWhile(new Func1<Integer, Boolean>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.1
            public Boolean call(Integer num) {
                return Boolean.valueOf(!MetricsClientImpl.this.nowClosed.get());
            }
        }).subscribe();
        this.workerConnectionsStatusObserver = observer;
        this.dataRecvTimeoutSecs = j;
    }

    private String toWorkerConnName(String str, int i) {
        return str + "-" + i;
    }

    @Override // io.mantisrx.server.worker.client.MetricsClient
    public boolean hasError() {
        return false;
    }

    @Override // io.mantisrx.server.worker.client.MetricsClient
    public String getError() {
        return null;
    }

    @Override // io.mantisrx.server.worker.client.MetricsClient
    public Observable<Observable<T>> getResults() {
        return Observable.create(new Observable.OnSubscribe<Observable<T>>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.3
            public void call(Subscriber subscriber) {
                MetricsClientImpl.this.internalGetResults().subscribe(subscriber);
            }
        }).subscribeOn(Schedulers.io());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Observable<T>> internalGetResults() {
        return this.jobWorkerMetricsLocator.locateWorkerMetricsForJob(this.jobId).map(new Func1<EndpointChange, Observable<T>>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.5
            public Observable<T> call(EndpointChange endpointChange) {
                return MetricsClientImpl.this.nowClosed.get() ? Observable.empty() : endpointChange.getType() == EndpointChange.Type.complete ? MetricsClientImpl.this.handleEndpointClose(endpointChange) : MetricsClientImpl.this.handleEndpointConnect(endpointChange);
            }
        }).lift(new Observable.Operator<Observable<T>, Observable<T>>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.4
            public Subscriber<? super Observable<T>> call(Subscriber<? super Observable<T>> subscriber) {
                subscriber.add(Subscriptions.create(new Action0() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.4.1
                    public void call() {
                        try {
                            MetricsClientImpl.logger.warn("Closing metrics connections to workers of job " + MetricsClientImpl.this.jobId);
                            MetricsClientImpl.this.closeAllConnections();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }));
                return subscriber;
            }
        }).share().lift(new DropOperator("client_metrics_share"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<T> handleEndpointConnect(EndpointChange endpointChange) {
        logger.info("Opening connection to metrics sink at " + endpointChange.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost(endpointChange.getEndpoint().getHost());
        if (!(endpointChange.getEndpoint() instanceof WorkerEndpoint)) {
            logger.error("endpoint received on Endpoint connect is not a WorkerEndpoint {}, no metrics port to connect to", endpointChange.getEndpoint());
            return Observable.empty();
        }
        int metricPort = endpointChange.getEndpoint().getMetricPort();
        WorkerConnection<T> call = this.workerConnectionFunc.call(unwrappedHost, Integer.valueOf(metricPort), new Action1<Boolean>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.6
            public void call(Boolean bool) {
                MetricsClientImpl.this.updateWorkerConx(bool);
            }
        }, new Action1<Boolean>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.7
            public void call(Boolean bool) {
                MetricsClientImpl.this.updateWorkerDataReceivingStatus(bool);
            }
        }, this.dataRecvTimeoutSecs);
        if (this.nowClosed.get()) {
            try {
                call.close();
            } catch (Exception e) {
                logger.warn("Error closing worker metrics connection " + call.getName() + " - " + e.getMessage(), e);
            }
            return Observable.empty();
        }
        this.workerConnections.put(toWorkerConnName(unwrappedHost, metricPort), call);
        if (this.nowClosed.get()) {
            try {
                call.close();
                this.workerConnections.remove(toWorkerConnName(unwrappedHost, metricPort));
                return Observable.empty();
            } catch (Exception e2) {
                logger.warn("Error closing worker metrics connection - " + e2.getMessage());
            }
        }
        return (Observable) call.call();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWorkerDataReceivingStatus(Boolean bool) {
        if (bool.booleanValue()) {
            this.workerConnReceivingDataGauge.increment();
        } else {
            this.workerConnReceivingDataGauge.decrement();
        }
        this.expectedWorkersGauge.set(this.numWorkers.get());
        if (this.workerConnectionsStatusObserver != null) {
            synchronized (this.workerConnectionsStatusObserver) {
                this.workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(this.workerConnReceivingDataGauge.value(), this.workersGauge.value(), this.numWorkers.get()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWorkerConx(Boolean bool) {
        if (bool.booleanValue()) {
            this.workersGauge.increment();
        } else {
            this.workersGauge.decrement();
        }
        this.expectedWorkersGauge.set(this.numWorkers.get());
        if (this.workerConnectionsStatusObserver != null) {
            synchronized (this.workerConnectionsStatusObserver) {
                this.workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(this.workerConnReceivingDataGauge.value(), this.workersGauge.value(), this.numWorkers.get()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<T> handleEndpointClose(EndpointChange endpointChange) {
        logger.info("Closed connection to metrics sink at " + endpointChange.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost(endpointChange.getEndpoint().getHost());
        if (!(endpointChange.getEndpoint() instanceof WorkerEndpoint)) {
            logger.warn("endpoint received on Endpoint close is not a WorkerEndpoint {}, worker endpoint required for metrics port", endpointChange.getEndpoint());
            return Observable.empty();
        }
        WorkerConnection remove = this.workerConnections.remove(toWorkerConnName(unwrappedHost, endpointChange.getEndpoint().getMetricPort()));
        if (remove != null) {
            try {
                remove.close();
            } catch (Exception e) {
                logger.error("Unexpected exception on closing worker metrics connection: " + e.getMessage(), e);
            }
        }
        return Observable.empty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAllConnections() throws Exception {
        this.nowClosed.set(true);
        this.workerConnections.closeOut(new Action1<WorkerConnection<T>>() { // from class: io.mantisrx.server.worker.client.MetricsClientImpl.8
            public void call(WorkerConnection<T> workerConnection) {
                try {
                    workerConnection.close();
                } catch (Exception e) {
                    MetricsClientImpl.logger.warn("Error closing worker metrics connection " + workerConnection.getName() + " - " + e.getMessage(), e);
                }
            }
        });
    }
}
