package org.smartboot.mqtt.broker.processor;

import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.MqttSession;
import org.smartboot.mqtt.broker.eventbus.EventObject;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.provider.impl.session.SessionState;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import org.smartboot.mqtt.common.enums.MqttProtocolEnum;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.message.variable.MqttConnAckVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectAckProperties;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;

/* loaded from: input_file:org/smartboot/mqtt/broker/processor/ConnectProcessor.class */
public class ConnectProcessor implements MqttProcessor<MqttConnectMessage> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectProcessor.class);

    @Override // org.smartboot.mqtt.broker.processor.MqttProcessor
    public void process(BrokerContext brokerContext, MqttSession mqttSession, MqttConnectMessage mqttConnectMessage) {
        String clientIdentifier = mqttConnectMessage.getPayload().clientIdentifier();
        if (clientIdentifier.length() == 0) {
            clientIdentifier = MqttUtil.createClientId();
        }
        mqttSession.setClientId(clientIdentifier);
        mqttSession.setMqttVersion(mqttConnectMessage.getVersion());
        checkMessage(mqttSession, mqttConnectMessage);
        brokerContext.getEventBus().publish(ServerEventType.CONNECT, EventObject.newEventObject(mqttSession, mqttConnectMessage));
        mqttSession.setAuthorized(true);
        refreshSession(brokerContext, mqttSession, mqttConnectMessage);
        storeWillMessage(mqttSession, mqttConnectMessage);
        mqttSession.setProperties((ConnectProperties) mqttConnectMessage.getVariableHeader().getProperties());
        int min = mqttSession.getMqttVersion() == MqttVersion.MQTT_5 ? Math.min(mqttConnectMessage.getVariableHeader().getProperties().getReceiveMaximum(), brokerContext.getBrokerConfigure().getMaxInflight()) : brokerContext.getBrokerConfigure().getMaxInflight();
        mqttSession.setInflightQueue(new InflightQueue(mqttSession, min));
        ConnectAckProperties connectAckProperties = null;
        if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
            connectAckProperties = new ConnectAckProperties();
            connectAckProperties.setReceiveMaximum(min);
        }
        mqttSession.write(connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, !mqttConnectMessage.getVariableHeader().isCleanSession(), connectAckProperties));
        LOGGER.debug("CONNECT message processed CId={}", mqttSession.getClientId());
    }

    private void checkMessage(MqttSession mqttSession, MqttConnectMessage mqttConnectMessage) {
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.getVariableHeader();
        MqttProtocolEnum byName = MqttProtocolEnum.getByName(variableHeader.protocolName());
        ValidateUtils.notNull(byName, "invalid protocol", () -> {
            LOGGER.error("invalid protocol:{}", variableHeader.protocolName());
            if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
                connFailAck(MqttConnectReturnCode.UNSUPPORTED_PROTOCOL_VERSION, mqttSession);
            }
            mqttSession.disconnect();
        });
        MqttConnectPayload payload = mqttConnectMessage.getPayload();
        String clientIdentifier = payload.clientIdentifier();
        MqttVersion byProtocolWithVersion = MqttVersion.getByProtocolWithVersion(byName, variableHeader.getProtocolLevel());
        ValidateUtils.notNull(byProtocolWithVersion, "invalid version", () -> {
            connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttSession);
        });
        boolean z = variableHeader.getReserved() == 0;
        Objects.requireNonNull(mqttSession);
        ValidateUtils.isTrue(z, "", mqttSession::disconnect);
        ValidateUtils.isTrue(!(StringUtils.isNotBlank(clientIdentifier) && byProtocolWithVersion == MqttVersion.MQTT_3_1 && clientIdentifier.length() > 23), "", () -> {
            connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttSession);
            LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
        });
        ValidateUtils.isTrue(variableHeader.isCleanSession() || !StringUtils.isBlank(clientIdentifier), "", () -> {
            connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttSession);
            LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
        });
    }

    private void refreshSession(BrokerContext brokerContext, MqttSession mqttSession, MqttConnectMessage mqttConnectMessage) {
        mqttSession.setCleanSession(mqttConnectMessage.getVariableHeader().isCleanSession());
        MqttSession session = brokerContext.getSession(mqttSession.getClientId());
        if (session != null) {
            if (mqttSession.isCleanSession()) {
                session.setCleanSession(true);
                LOGGER.info("disconnect session:{}", session);
                session.disconnect();
            } else {
                session.disconnect();
                SessionState sessionState = brokerContext.getProviders().getSessionStateProvider().get(mqttSession.getClientId());
                if (sessionState != null) {
                    Map<String, MqttQoS> subscribers = sessionState.getSubscribers();
                    Objects.requireNonNull(mqttSession);
                    subscribers.forEach(mqttSession::subscribe);
                }
            }
        }
        brokerContext.addSession(mqttSession);
        LOGGER.debug("add session for client:{}", mqttSession);
    }

    private void storeWillMessage(MqttSession mqttSession, MqttConnectMessage mqttConnectMessage) {
        if (mqttConnectMessage.getVariableHeader().isWillFlag()) {
            WillMessage willMessage = mqttConnectMessage.getPayload().getWillMessage();
            MqttMessageBuilders.PublishBuilder retained = MqttMessageBuilders.publish().topicName(willMessage.getWillTopic()).qos(MqttQoS.valueOf(mqttConnectMessage.getVariableHeader().willQos())).payload(willMessage.getWillMessage()).retained(mqttConnectMessage.getFixedHeader().isRetain());
            if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
                retained.publishProperties(new PublishProperties());
            }
            mqttSession.setWillMessage(retained.build());
        }
    }

    private void connFailAck(MqttConnectReturnCode mqttConnectReturnCode, MqttSession mqttSession) {
        ValidateUtils.isTrue(mqttConnectReturnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED, "");
        ConnectAckProperties connectAckProperties = null;
        if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
            connectAckProperties = new ConnectAckProperties();
        }
        mqttSession.write(connAck(mqttConnectReturnCode, false, connectAckProperties));
        mqttSession.disconnect();
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode mqttConnectReturnCode, boolean z, ConnectAckProperties connectAckProperties) {
        return new MqttConnAckMessage(new MqttConnAckVariableHeader(mqttConnectReturnCode, z, connectAckProperties));
    }
}
