package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/publish/AbstractSubscriptionTracker.class */
public abstract class AbstractSubscriptionTracker implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionTracker.class);
    private static final MantisServerSubscriptionEnvelope DEFAULT_EMPTY_SUB_ENVELOPE = new MantisServerSubscriptionEnvelope(Collections.emptyList());
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final StreamManager streamManager;
    private final Counter refreshSubscriptionSuccessCount;
    private final Counter refreshSubscriptionFailedCount;
    private final Counter staleSubscriptionRemovedCount;
    private volatile Map<String, StreamSubscriptions> previousSubscriptions = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/publish/AbstractSubscriptionTracker$StreamSubscriptions.class */
    public static class StreamSubscriptions {
        private final String streamName;
        private final MantisServerSubscriptionEnvelope subsEnvelope;
        private final transient long createTimeMs = System.currentTimeMillis();

        public StreamSubscriptions(String str, MantisServerSubscriptionEnvelope mantisServerSubscriptionEnvelope) {
            this.streamName = str;
            this.subsEnvelope = mantisServerSubscriptionEnvelope;
        }

        public String getStreamName() {
            return this.streamName;
        }

        public MantisServerSubscriptionEnvelope getSubsEnvelope() {
            return this.subsEnvelope;
        }

        public long getCreateTimeMs() {
            return this.createTimeMs;
        }
    }

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.streamManager = streamManager;
        this.refreshSubscriptionSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionSuccessCount");
        this.refreshSubscriptionFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionFailedCount");
        this.staleSubscriptionRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionRemovedCount");
    }

    void propagateSubscriptionChanges(Set<MantisServerSubscription> set, Set<MantisServerSubscription> set2) {
        HashSet hashSet = new HashSet(set);
        hashSet.removeAll(set2);
        hashSet.stream().forEach(mantisServerSubscription -> {
            Optional<Subscription> subscription = SubscriptionFactory.getSubscription(mantisServerSubscription.getSubscriptionId(), mantisServerSubscription.getQuery());
            if (subscription.isPresent()) {
                this.streamManager.removeStreamSubscription(subscription.get());
            } else {
                LOG.warn("unexpected to find invalid subscription to remove {}", mantisServerSubscription);
            }
        });
        HashSet hashSet2 = new HashSet(set2);
        hashSet2.removeAll(set);
        hashSet2.stream().forEach(mantisServerSubscription2 -> {
            Optional<Subscription> subscription = SubscriptionFactory.getSubscription(mantisServerSubscription2.getSubscriptionId(), mantisServerSubscription2.getQuery());
            if (subscription.isPresent()) {
                this.streamManager.addStreamSubscription(subscription.get());
            } else {
                LOG.info("will not add invalid subscription {}", mantisServerSubscription2);
            }
        });
    }

    private void cleanupStaleSubscriptions(String str) {
        StreamSubscriptions streamSubscriptions = this.previousSubscriptions.get(str);
        if (streamSubscriptions != null) {
            if (System.currentTimeMillis() - streamSubscriptions.getCreateTimeMs() > ((long) (this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000))) {
                LOG.info("removing stale subscriptions data for stream {} ({} created {})", new Object[]{str, streamSubscriptions.getSubsEnvelope(), Long.valueOf(streamSubscriptions.getCreateTimeMs())});
                this.staleSubscriptionRemovedCount.increment();
                propagateSubscriptionChanges(this.previousSubscriptions.remove(str).getSubsEnvelope().getSubscriptions(), Collections.emptySet());
            }
        }
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String str, String str2);

    @Override // io.mantisrx.publish.SubscriptionTracker
    public void refreshSubscriptions() {
        if (!this.mrePublishConfiguration.isMREClientEnabled() || this.streamManager.getRegisteredStreams().isEmpty()) {
            return;
        }
        for (Map.Entry<String, String> entry : this.mrePublishConfiguration.streamNameToJobClusterMapping().entrySet()) {
            String key = entry.getKey();
            if (this.streamManager.getRegisteredStreams().contains(key) || "__default__".equals(key)) {
                String value = entry.getValue();
                try {
                    Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions = fetchSubscriptions(key, value);
                    if (fetchSubscriptions.isPresent()) {
                        MantisServerSubscriptionEnvelope mantisServerSubscriptionEnvelope = fetchSubscriptions.get();
                        propagateSubscriptionChanges(this.previousSubscriptions.getOrDefault(key, new StreamSubscriptions(key, DEFAULT_EMPTY_SUB_ENVELOPE)).getSubsEnvelope().getSubscriptions(), mantisServerSubscriptionEnvelope.getSubscriptions());
                        LOG.debug("{} subscriptions updated to {}", key, mantisServerSubscriptionEnvelope);
                        this.previousSubscriptions.put(key, new StreamSubscriptions(key, mantisServerSubscriptionEnvelope));
                        this.refreshSubscriptionSuccessCount.increment();
                    } else {
                        cleanupStaleSubscriptions(key);
                        this.refreshSubscriptionFailedCount.increment();
                    }
                } catch (Exception e) {
                    LOG.info("refresh subscriptions failed for {} {}", new Object[]{key, value, e});
                    this.refreshSubscriptionFailedCount.increment();
                }
            } else {
                LOG.debug("will not fetch subscriptions for un-registered stream {}", key);
            }
        }
    }

    public Optional<MantisServerSubscriptionEnvelope> getCurrentSubs(String str) {
        return Optional.ofNullable(this.previousSubscriptions.get(str)).map((v0) -> {
            return v0.getSubsEnvelope();
        });
    }
}
