package io.mantisrx.publish;

import com.netflix.mantis.discovery.proto.JobDiscoveryInfo;
import com.netflix.mantis.discovery.proto.MantisWorker;
import com.netflix.mantis.discovery.proto.StageWorkers;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/publish/DefaultSubscriptionTracker.class */
public class DefaultSubscriptionTracker extends AbstractSubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultSubscriptionTracker.class);
    private static final String SUBSCRIPTIONS_URL_FORMAT = "http://%s:%d?jobId=%s";
    private final MrePublishConfiguration mrePublishConfiguration;
    private final String subscriptionsFetchQueryParamString;
    private final Counter fetchSubscriptionsFailedCount;
    private final Counter fetchSubscriptionsNon200Count;
    private final HttpClient httpClient;
    private final MantisJobDiscovery jobDiscovery;
    private final Random random;

    public DefaultSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery mantisJobDiscovery, StreamManager streamManager, HttpClient httpClient) {
        super(mrePublishConfiguration, registry, mantisJobDiscovery, streamManager);
        this.random = new Random();
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.jobDiscovery = mantisJobDiscovery;
        this.subscriptionsFetchQueryParamString = mrePublishConfiguration.subscriptionFetchQueryParams();
        this.httpClient = httpClient;
        this.fetchSubscriptionsFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "fetchSubscriptionsFailedCount");
        this.fetchSubscriptionsNon200Count = SpectatorUtils.buildAndRegisterCounter(registry, "fetchSubscriptionsNon200Count");
    }

    private Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String str, String str2, MantisWorker mantisWorker) {
        try {
            String format = !this.subscriptionsFetchQueryParamString.isEmpty() ? String.format(SUBSCRIPTIONS_URL_FORMAT, mantisWorker.getHost(), Integer.valueOf(mantisWorker.getPort()), str2 + "&" + this.subscriptionsFetchQueryParamString) : String.format(SUBSCRIPTIONS_URL_FORMAT, mantisWorker.getHost(), Integer.valueOf(mantisWorker.getPort()), str2);
            LOG.trace("Subscription fetch URL: {}", format);
            HttpResponse send = this.httpClient.get(URI.create(format)).withConnectTimeout(1000).withReadTimeout(1000).send();
            if (send.status() == 200) {
                MantisServerSubscriptionEnvelope mantisServerSubscriptionEnvelope = (MantisServerSubscriptionEnvelope) DefaultObjectMapper.getInstance().readValue(send.entityAsString(), MantisServerSubscriptionEnvelope.class);
                LOG.debug("got subs {} from Mantis worker {}", mantisServerSubscriptionEnvelope, mantisWorker);
                return Optional.ofNullable(mantisServerSubscriptionEnvelope);
            }
            LOG.info("got {} {} response from Mantis worker {}", Integer.valueOf(send.status()), send.entityAsString(), mantisWorker);
            this.fetchSubscriptionsNon200Count.increment();
            return Optional.empty();
        } catch (Exception e) {
            LOG.info("caught exception fetching subs for stream {} from {}", str, mantisWorker, e);
            this.fetchSubscriptionsFailedCount.increment();
            return Optional.empty();
        }
    }

    private List<MantisWorker> randomSubset(List<MantisWorker> list, int i) {
        int size = list.size();
        if (i < 0 || i >= size) {
            return list;
        }
        for (int i2 = 0; i2 < i; i2++) {
            int nextInt = this.random.nextInt(size);
            MantisWorker mantisWorker = list.get(i2);
            list.set(i2, list.get(nextInt));
            list.set(nextInt, mantisWorker);
        }
        return list.subList(0, i);
    }

    private Optional<MantisServerSubscriptionEnvelope> subsetSubscriptionsResolver(String str, String str2, List<MantisWorker> list) {
        HashMap hashMap = new HashMap();
        int min = Math.min(this.mrePublishConfiguration.maxNumWorkersToFetchSubscriptionsFrom(), list.size());
        int i = min / 2;
        Iterator<MantisWorker> it = randomSubset(list, min).iterator();
        while (it.hasNext()) {
            Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions = fetchSubscriptions(str, str2, it.next());
            if (fetchSubscriptions.isPresent()) {
                MantisServerSubscriptionEnvelope mantisServerSubscriptionEnvelope = fetchSubscriptions.get();
                Integer num = (Integer) hashMap.putIfAbsent(mantisServerSubscriptionEnvelope, 0);
                if (num != null) {
                    hashMap.put(mantisServerSubscriptionEnvelope, Integer.valueOf(num.intValue() + 1));
                    if (num.intValue() >= i) {
                        return Optional.ofNullable(mantisServerSubscriptionEnvelope);
                    }
                } else {
                    continue;
                }
            }
        }
        return hashMap.entrySet().stream().max(Map.Entry.comparingByValue()).map((v0) -> {
            return v0.getKey();
        });
    }

    @Override // io.mantisrx.publish.AbstractSubscriptionTracker
    public Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String str, String str2) {
        Optional<JobDiscoveryInfo> currentJobWorkers = this.jobDiscovery.getCurrentJobWorkers(str2);
        if (currentJobWorkers.isPresent()) {
            JobDiscoveryInfo jobDiscoveryInfo = currentJobWorkers.get();
            StageWorkers ingestStageWorkers = jobDiscoveryInfo.getIngestStageWorkers();
            if (ingestStageWorkers != null) {
                return subsetSubscriptionsResolver(str, jobDiscoveryInfo.getJobId(), ingestStageWorkers.getWorkers());
            }
            LOG.info("Subscription refresh failed, workers null for {}", str2);
        } else {
            LOG.info("Subscription refresh failed, job discovery info not found for {}", str2);
        }
        return Optional.empty();
    }
}
