package org.smartboot.mqtt.broker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.provider.impl.session.SessionState;
import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.MqttWriter;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.timer.TimerTask;
import org.smartboot.socket.transport.AioSession;

/* loaded from: input_file:org/smartboot/mqtt/broker/MqttSession.class */
public class MqttSession extends AbstractSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSession.class);
    private final Map<String, TopicFilterSubscriber> subscribers;
    private final BrokerContext mqttContext;
    private String username;
    private boolean authorized;
    private MqttPublishMessage willMessage;
    private boolean cleanSession;
    private ConnectProperties properties;
    TimerTask idleConnectTimer;
    private final Map<BrokerTopic, Long> retainOffsetCache;

    public MqttSession(BrokerContext brokerContext, AioSession aioSession, MqttWriter mqttWriter) {
        super(brokerContext.getEventBus(), brokerContext.getTimer());
        this.subscribers = new ConcurrentHashMap();
        this.retainOffsetCache = new HashMap();
        this.mqttContext = brokerContext;
        this.session = aioSession;
        this.mqttWriter = mqttWriter;
        this.idleConnectTimer = brokerContext.getTimer().schedule(new AsyncTask() { // from class: org.smartboot.mqtt.broker.MqttSession.1
            public void execute() {
                if (MqttSession.this.isAuthorized()) {
                    return;
                }
                MqttSession.LOGGER.info("长时间未收到客户端：{} 的Connect消息，连接断开！", MqttSession.this.getClientId());
                MqttSession.this.disconnect();
            }
        }, brokerContext.getBrokerConfigure().getNoConnectIdleTimeout(), TimeUnit.MILLISECONDS);
        brokerContext.getEventBus().publish(ServerEventType.SESSION_CREATE, this);
    }

    public ConnectProperties getProperties() {
        return this.properties;
    }

    public void setProperties(ConnectProperties connectProperties) {
        this.properties = connectProperties;
    }

    public boolean isCleanSession() {
        return this.cleanSession;
    }

    public void setCleanSession(boolean z) {
        this.cleanSession = z;
    }

    public synchronized void disconnect() {
        if (isDisconnect()) {
            return;
        }
        if (isAuthorized()) {
            if (this.cleanSession) {
                this.mqttContext.getProviders().getSessionStateProvider().remove(this.clientId);
            } else {
                SessionState sessionState = new SessionState();
                this.subscribers.values().forEach(topicFilterSubscriber -> {
                    sessionState.getSubscribers().put(topicFilterSubscriber.getTopicFilterToken().getTopicFilter(), topicFilterSubscriber.getMqttQoS());
                });
                this.mqttContext.getProviders().getSessionStateProvider().store(this.clientId, sessionState);
            }
        }
        if (this.willMessage != null) {
            this.mqttContext.getMessageBus().consume(this, this.willMessage);
        }
        this.subscribers.keySet().forEach(this::unsubscribe);
        MqttSession removeSession = this.mqttContext.removeSession(getClientId());
        if (removeSession != null && removeSession != this) {
            LOGGER.error("remove old session success:{}", removeSession);
            removeSession.disconnect();
        }
        LOGGER.debug("remove mqttSession success:{}", removeSession);
        this.disconnect = true;
        try {
            this.session.close(false);
        } finally {
            this.mqttContext.getEventBus().publish(EventType.DISCONNECT, this);
        }
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public String getUsername() {
        return this.username;
    }

    public void setUsername(String str) {
        this.username = str;
    }

    public MqttQoS subscribe(String str, MqttQoS mqttQoS) {
        if (!this.mqttContext.getProviders().getSubscribeProvider().subscribeTopic(str, this)) {
            return MqttQoS.FAILURE;
        }
        subscribe0(str, mqttQoS);
        return mqttQoS;
    }

    private void subscribe0(String str, MqttQoS mqttQoS) {
        TopicFilterSubscriber topicFilterSubscriber = this.subscribers.get(str);
        if (topicFilterSubscriber != null) {
            topicFilterSubscriber.setMqttQoS(mqttQoS);
            topicFilterSubscriber.getTopicSubscribers().values().forEach(topicSubscriber -> {
                topicSubscriber.setMqttQoS(mqttQoS);
            });
            return;
        }
        TopicToken topicToken = new TopicToken(str);
        if (!topicToken.isWildcards()) {
            this.mqttContext.getOrCreateTopic(str);
        }
        TopicFilterSubscriber topicFilterSubscriber2 = new TopicFilterSubscriber(topicToken, mqttQoS);
        ValidateUtils.isTrue(this.subscribers.put(str, topicFilterSubscriber2) == null, "duplicate topic filter");
        this.mqttContext.getTopicSubscribeTree().subscribeTopic(this, topicFilterSubscriber2);
        this.mqttContext.getPublishTopicTree().match(topicToken, brokerTopic -> {
            subscribeSuccess(mqttQoS, topicToken, brokerTopic);
        });
    }

    public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic brokerTopic) {
        if (this.mqttContext.getProviders().getSubscribeProvider().matchTopic(brokerTopic, this)) {
            TopicSubscriber topicSubscriber = brokerTopic.getConsumeOffsets().get(this);
            if (topicSubscriber == null) {
                long latestOffset = this.mqttContext.getProviders().getPersistenceProvider().getLatestOffset(brokerTopic.getTopic());
                Long l = this.retainOffsetCache.get(brokerTopic);
                long oldestOffset = this.mqttContext.getProviders().getRetainMessageProvider().getOldestOffset(brokerTopic.getTopic());
                if (l == null || l.longValue() < oldestOffset) {
                    l = Long.valueOf(oldestOffset);
                }
                TopicSubscriber topicSubscriber2 = new TopicSubscriber(brokerTopic, this, mqttQoS, latestOffset + 1, l.longValue());
                topicSubscriber2.setTopicFilterToken(topicToken);
                brokerTopic.getConsumeOffsets().put(this, topicSubscriber2);
                this.subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(brokerTopic, topicSubscriber2);
                return;
            }
            TopicToken topicFilterToken = topicSubscriber.getTopicFilterToken();
            if (topicFilterToken.isWildcards()) {
                if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > topicFilterToken.getTopicFilter().length()) {
                    TopicSubscriber remove = this.subscribers.get(topicFilterToken.getTopicFilter()).getTopicSubscribers().remove(brokerTopic);
                    remove.setMqttQoS(mqttQoS);
                    remove.setTopicFilterToken(topicToken);
                    this.subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(brokerTopic, remove);
                    this.mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, remove);
                }
            }
        }
    }

    public void resubscribe() {
        this.subscribers.values().stream().filter(topicFilterSubscriber -> {
            return topicFilterSubscriber.getTopicFilterToken().isWildcards();
        }).forEach(topicFilterSubscriber2 -> {
            this.mqttContext.getPublishTopicTree().match(topicFilterSubscriber2.getTopicFilterToken(), brokerTopic -> {
                subscribeSuccess(topicFilterSubscriber2.getMqttQoS(), topicFilterSubscriber2.getTopicFilterToken(), brokerTopic);
            });
        });
    }

    public void unsubscribe(String str) {
        TopicFilterSubscriber remove = this.subscribers.remove(str);
        if (remove == null) {
            return;
        }
        remove.getTopicSubscribers().values().forEach(topicSubscriber -> {
            TopicSubscriber remove2 = topicSubscriber.getTopic().getConsumeOffsets().remove(this);
            this.retainOffsetCache.put(topicSubscriber.getTopic(), Long.valueOf(topicSubscriber.getRetainConsumerOffset()));
            if (topicSubscriber != remove2) {
                LOGGER.error("remove subscriber:{} error!", remove2);
            } else {
                remove2.disable();
                LOGGER.debug("remove subscriber:{} success!", topicSubscriber.getTopic().getTopic());
            }
        });
        this.mqttContext.getTopicSubscribeTree().unsubscribe(this, remove);
    }

    public boolean isAuthorized() {
        return this.authorized;
    }

    public void setAuthorized(boolean z) {
        this.authorized = z;
    }

    public void setWillMessage(MqttPublishMessage mqttPublishMessage) {
        this.willMessage = mqttPublishMessage;
    }

    public BrokerContext getMqttContext() {
        return this.mqttContext;
    }

    public Map<String, TopicFilterSubscriber> getSubscribers() {
        return this.subscribers;
    }
}
