package com.pushtechnology.diffusion.client.internal.routing;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.callbacks.Stream;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.impl.InternalTopics;
import com.pushtechnology.diffusion.client.internal.services.MutableServiceRegistry;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.session.SessionErrorImpl;
import com.pushtechnology.diffusion.client.session.SessionAttributes;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.collections.ImmutableSet;
import com.pushtechnology.diffusion.collections.InternSet;
import com.pushtechnology.diffusion.collections.WeakInternSet;
import com.pushtechnology.diffusion.command.sender.ServiceLocator;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.impl.TopicTypeToDataType;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import com.pushtechnology.diffusion.util.concurrent.threads.CommonThreadPools;
import com.pushtechnology.diffusion.util.concurrent.threads.ExecutionPool;
import com.pushtechnology.repackaged.picocontainer.Disposable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:com/pushtechnology/diffusion/client/internal/routing/TopicRoutingImpl.class */
public class TopicRoutingImpl implements TopicRouting, Disposable {
    private static final Logger LOG = I18nLogger.getLogger((Class<?>) TopicRoutingImpl.class);
    protected static final int INITIAL_CAPACITY = Integer.getInteger("diffusion.topic_routing.initial_capacity", SessionAttributes.MAXIMUM_MESSAGE_SIZE_MIN).intValue();
    private final InternedTopicSpecifications internedSpecifications;
    private final DataTypes dataTypes;
    private final TopicTypeToDataType topicTypeToDataType;
    private final ExecutionPool inboundThreadPool;

    @InboundThreadOnly
    private final Map<Integer, TopicCacheEntry> byId = new HashMap(INITIAL_CAPACITY, 0.75f);

    @InboundThreadOnly
    private final Map<TopicSelector, ImmutableSet<StreamProxy>> streamProxies = new HashMap();
    private final InternSet<ImmutableSet<StreamProxy>> internedStreamProxies = new WeakInternSet();

    @InboundThreadOnly
    private ImmutableSet<StreamProxy> fallbackProxies = ImmutableSet.empty();

    @InboundThreadOnly
    private final Set<StreamProxy> temporaryHashSet = new HashSet();
    private volatile boolean cacheDisabled = Boolean.getBoolean("diffusion.disabletopicvaluecache");
    private volatile boolean closed = false;

    public TopicRoutingImpl(DataTypes dataTypes, TopicTypeToDataType topicTypeToDataType, CommonThreadPools commonThreadPools, InternedTopicSpecifications internedTopicSpecifications) {
        this.dataTypes = dataTypes;
        this.topicTypeToDataType = topicTypeToDataType;
        this.internedSpecifications = internedTopicSpecifications;
        this.inboundThreadPool = commonThreadPools.getDefaultInboundThreadPool();
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public void wireSesssionServices(MutableServiceRegistry mutableServiceRegistry, ServiceLocator serviceLocator) {
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public final void notifySubscription(int i, String str, TopicSpecification topicSpecification) {
        TopicCacheEntry noConversionTopicCacheEntry;
        TopicSpecification intern = this.internedSpecifications.intern(topicSpecification);
        ImmutableSet<StreamProxy> streamsForTopicPath = streamsForTopicPath(str);
        TopicType type = intern.getType();
        if (this.cacheDisabled || type == TopicType.UNKNOWN_TOPIC_TYPE) {
            noConversionTopicCacheEntry = new NoConversionTopicCacheEntry(str, intern, streamsForTopicPath);
        } else if (type == TopicType.TIME_SERIES) {
            noConversionTopicCacheEntry = new TimeSeriesTopicCacheEntry(str, intern, this.dataTypes.getByName(intern.getProperties().get(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE)), streamsForTopicPath);
        } else {
            noConversionTopicCacheEntry = new DataTypeTopicCacheEntry(str, intern, this.topicTypeToDataType.get(type), streamsForTopicPath);
        }
        TopicCacheEntry putCached = putCached(i, noConversionTopicCacheEntry);
        if (putCached != null) {
            throw new IllegalStateException("Existing entry " + putCached + " found for id " + i + " when adding " + noConversionTopicCacheEntry);
        }
        noConversionTopicCacheEntry.notifyInitialStreamsOfSubscription(this.fallbackProxies);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public final void notifyUnsubscriptionOfAllTopics() {
        runIfNotClosed(() -> {
            Iterator<TopicCacheEntry> it = this.byId.values().iterator();
            while (it.hasNext()) {
                notifyStreamsOfUnsubscription(it.next(), Topics.UnsubscribeReason.SUBSCRIPTION_REFRESH);
            }
            this.byId.clear();
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public final void notifyValue(InternalSession internalSession, int i, IBytes iBytes) {
        if (this.closed) {
            return;
        }
        TopicCacheEntry cached = getCached(i);
        if (cached != null) {
            cached.handleValue(this.dataTypes, iBytes, this.fallbackProxies);
        } else {
            internalSession.getErrorHandler().notifyError(new SessionErrorImpl("Data loss on topic with ID " + i + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    @InboundThreadOnly
    protected final TopicCacheEntry getCached(int i) {
        return this.byId.get(Integer.valueOf(i));
    }

    @InboundThreadOnly
    protected TopicCacheEntry putCached(int i, TopicCacheEntry topicCacheEntry) {
        return this.byId.put(Integer.valueOf(i), topicCacheEntry);
    }

    @InboundThreadOnly
    protected TopicCacheEntry removeCached(int i) {
        return this.byId.remove(Integer.valueOf(i));
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public final void notifyDelta(InternalSession internalSession, int i, IBytes iBytes) {
        if (this.closed) {
            return;
        }
        TopicCacheEntry cached = getCached(i);
        if (cached != null) {
            handleDelta(iBytes, cached);
        } else {
            internalSession.getErrorHandler().notifyError(new SessionErrorImpl("Data loss on topic with ID " + i + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    private void handleDelta(IBytes iBytes, TopicCacheEntry topicCacheEntry) {
        topicCacheEntry.handleDelta(this.dataTypes, iBytes, this.fallbackProxies);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public final void notifyUnsubscription(int i, Topics.UnsubscribeReason unsubscribeReason) {
        TopicCacheEntry removeCached = removeCached(i);
        if (removeCached != null) {
            notifyStreamsOfUnsubscription(removeCached, unsubscribeReason);
        }
    }

    private void notifyStreamsOfUnsubscription(TopicCacheEntry topicCacheEntry, Topics.UnsubscribeReason unsubscribeReason) {
        topicCacheEntry.notifyStreamsOfUnsubscription(unsubscribeReason, this.fallbackProxies);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public final <V> void addStream(TopicSelector topicSelector, Class<? extends V> cls, Topics.ValueStream<V> valueStream) {
        addStreamProxy(topicSelector, () -> {
            return new ValueStreamProxy(cls, valueStream);
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public final void addStream(TopicSelector topicSelector, InternalTopics.TopicStream topicStream) {
        addStreamProxy(topicSelector, () -> {
            return new TopicStreamProxy(topicStream);
        });
    }

    private void addStreamProxy(TopicSelector topicSelector, Supplier<StreamProxy> supplier) {
        runIfNotClosed(() -> {
            ImmutableSet<StreamProxy> with;
            ImmutableSet<StreamProxy> immutableSet = this.streamProxies.get(topicSelector);
            StreamProxy streamProxy = (StreamProxy) supplier.get();
            if (immutableSet == null) {
                with = ImmutableSet.of((Object[]) new StreamProxy[]{streamProxy});
            } else {
                with = immutableSet.with(streamProxy);
                if (with == immutableSet) {
                    return;
                }
            }
            this.streamProxies.put(topicSelector, this.internedStreamProxies.intern(with));
            for (TopicCacheEntry topicCacheEntry : this.byId.values()) {
                if (topicSelector.selects(topicCacheEntry.getTopicPath())) {
                    topicCacheEntry.addStream(this.dataTypes, streamProxy, this.fallbackProxies, this.internedStreamProxies);
                }
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public final <V> void addFallbackStream(Class<? extends V> cls, Topics.ValueStream<V> valueStream) {
        addFallbackStreamProxy(() -> {
            return new ValueStreamProxy(cls, valueStream);
        });
    }

    private void addFallbackStreamProxy(Supplier<StreamProxy> supplier) {
        runIfNotClosed(() -> {
            ImmutableSet<StreamProxy> immutableSet = this.fallbackProxies;
            StreamProxy streamProxy = (StreamProxy) supplier.get();
            ImmutableSet<StreamProxy> with = immutableSet.with(streamProxy);
            if (with == immutableSet) {
                return;
            }
            this.fallbackProxies = with;
            Iterator<TopicCacheEntry> it = this.byId.values().iterator();
            while (it.hasNext()) {
                it.next().notifyFallbackSubscription(this.dataTypes, streamProxy);
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public final <V> void addTimeSeriesStream(TopicSelector topicSelector, Class<? extends V> cls, Topics.ValueStream<TimeSeries.Event<V>> valueStream) {
        addStreamProxy(topicSelector, () -> {
            return new TimeSeriesEventStreamProxy(cls, valueStream);
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public final void removeStream(Stream stream) {
        Predicate predicate = streamProxy -> {
            return streamProxy.getStream().equals(stream);
        };
        runIfNotClosed(() -> {
            boolean removeFallbackStreamProxy = removeFallbackStreamProxy(predicate);
            if (removeStreamProxy(predicate)) {
                removeFallbackStreamProxy = true;
                Iterator<TopicCacheEntry> it = this.byId.values().iterator();
                while (it.hasNext()) {
                    it.next().removeStream(this.dataTypes, predicate, this.fallbackProxies, this.internedStreamProxies);
                }
            }
            if (removeFallbackStreamProxy) {
                try {
                    stream.onClose();
                } catch (Exception e) {
                    LOG.error("TOPICS_STREAM_EXCEPTION", stream, e);
                }
            }
        });
    }

    @InboundThreadOnly
    private boolean removeFallbackStreamProxy(Predicate<StreamProxy> predicate) {
        ImmutableSet<StreamProxy> immutableSet = this.fallbackProxies;
        ImmutableSet<StreamProxy> withoutFirst = this.fallbackProxies.withoutFirst(predicate);
        if (withoutFirst == immutableSet) {
            return false;
        }
        this.fallbackProxies = withoutFirst;
        return true;
    }

    @InboundThreadOnly
    private boolean removeStreamProxy(Predicate<StreamProxy> predicate) {
        boolean z = false;
        Iterator<Map.Entry<TopicSelector, ImmutableSet<StreamProxy>>> it = this.streamProxies.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry next = it.next();
            ImmutableSet<StreamProxy> withoutFirst = ((ImmutableSet) next.getValue()).withoutFirst(predicate);
            if (withoutFirst.isEmpty()) {
                it.remove();
                z = true;
            } else if (withoutFirst != next.getValue()) {
                next.setValue(this.internedStreamProxies.intern(withoutFirst));
                z = true;
            }
        }
        return z;
    }

    @InboundThreadOnly
    private ImmutableSet<StreamProxy> streamsForTopicPath(String str) {
        Map<TopicSelector, ImmutableSet<StreamProxy>> map = this.streamProxies;
        if (map.isEmpty()) {
            return ImmutableSet.empty();
        }
        Set<StreamProxy> set = this.temporaryHashSet;
        for (Map.Entry<TopicSelector, ImmutableSet<StreamProxy>> entry : map.entrySet()) {
            if (entry.getKey().selects(str)) {
                set.addAll(entry.getValue());
            }
        }
        ImmutableSet<StreamProxy> from = ImmutableSet.from((Set) set);
        set.clear();
        return this.internedStreamProxies.intern(from);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public final void disableValueCaching() {
        this.cacheDisabled = true;
    }

    @Override // com.pushtechnology.repackaged.picocontainer.Disposable
    public final void dispose() {
        runIfNotClosed(() -> {
            this.closed = true;
            this.byId.clear();
            Set<StreamProxy> set = this.temporaryHashSet;
            set.addAll(this.fallbackProxies);
            Iterator<ImmutableSet<StreamProxy>> it = this.streamProxies.values().iterator();
            while (it.hasNext()) {
                set.addAll(it.next());
            }
            Iterator<StreamProxy> it2 = set.iterator();
            while (it2.hasNext()) {
                it2.next().onError(ErrorReason.SESSION_CLOSED);
            }
            set.clear();
            this.streamProxies.clear();
            this.fallbackProxies = ImmutableSet.empty();
        });
    }

    private void runIfNotClosed(Runnable runnable) {
        if (this.closed) {
            return;
        }
        this.inboundThreadPool.execute(this, () -> {
            if (this.closed) {
                return;
            }
            runnable.run();
        });
    }
}
