package org.smartboot.mqtt.client;

import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.AckMessage;
import org.smartboot.mqtt.common.DefaultMqttWriter;
import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPingReqMessage;
import org.smartboot.mqtt.common.message.MqttPingRespMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.MqttSubAckMessage;
import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.MqttTopicSubscription;
import org.smartboot.mqtt.common.message.MqttUnsubAckMessage;
import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
import org.smartboot.socket.extension.processor.AbstractMessageProcessor;
import org.smartboot.socket.transport.AioQuickClient;
import org.smartboot.socket.util.Attachment;
import org.smartboot.socket.util.QuickTimerTask;

/* loaded from: input_file:org/smartboot/mqtt/client/MqttClient.class */
public class MqttClient extends AbstractSession {
    private final Logger LOGGER;
    private final MqttClientConfigure clientConfigure;
    private final AbstractMessageProcessor<MqttMessage> messageProcessor;
    private final ConcurrentLinkedQueue<Runnable> registeredTasks;
    private final Map<String, Subscribe> subscribes;
    private final List<TopicToken> wildcardsToken;
    private AioQuickClient client;
    private AsynchronousChannelGroup asynchronousChannelGroup;
    private boolean connected;
    private Consumer<MqttConnAckMessage> connectConsumer;
    private Consumer<MqttConnAckMessage> reconnectConsumer;
    private boolean pingTimeout;

    public MqttClient(String str, int i, String str2) {
        this(str, i, str2, MqttVersion.MQTT_3_1_1);
    }

    public MqttClient(String str, int i, String str2, MqttVersion mqttVersion) {
        super(new ClientQosPublisher(), new EventBusImpl(EventType.types()));
        this.LOGGER = LoggerFactory.getLogger(getClass());
        this.clientConfigure = new MqttClientConfigure();
        this.messageProcessor = new MqttClientProcessor(this);
        this.registeredTasks = new ConcurrentLinkedQueue<>();
        this.subscribes = new ConcurrentHashMap();
        this.wildcardsToken = new LinkedList();
        this.connected = false;
        this.pingTimeout = false;
        this.clientConfigure.setHost(str);
        this.clientConfigure.setPort(i);
        this.clientConfigure.setMqttVersion(mqttVersion);
        this.clientId = str2;
        getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, eventObject) -> {
            if (eventObject.getObject() instanceof MqttPingRespMessage) {
                this.pingTimeout = false;
            }
        });
        getEventBus().subscribe(EventType.WRITE_MESSAGE, (eventType2, eventObject2) -> {
            if (eventObject2.getObject() instanceof MqttPingReqMessage) {
                this.pingTimeout = true;
            }
        });
    }

    public void connect() {
        try {
            this.asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(2, new ThreadFactory() { // from class: org.smartboot.mqtt.client.MqttClient.1
                private int i = 0;

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    StringBuilder append = new StringBuilder().append("mqtt-client-").append(MqttClient.this.hashCode()).append("-");
                    int i = this.i;
                    this.i = i + 1;
                    return new Thread(runnable, append.append(i).toString());
                }
            });
        } catch (IOException e) {
            this.LOGGER.error(e.getMessage(), e);
        }
        connect(this.asynchronousChannelGroup);
    }

    public void connect(AsynchronousChannelGroup asynchronousChannelGroup) {
        connect(asynchronousChannelGroup, mqttConnAckMessage -> {
        });
    }

    public void connect(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool) {
        connect(asynchronousChannelGroup, bufferPagePool, mqttConnAckMessage -> {
        });
    }

    public void connect(AsynchronousChannelGroup asynchronousChannelGroup, Consumer<MqttConnAckMessage> consumer) {
        connect(asynchronousChannelGroup, null, consumer);
    }

    public void connect(final AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, final Consumer<MqttConnAckMessage> consumer) {
        this.connectConsumer = mqttConnAckMessage -> {
            if (!this.clientConfigure.isAutomaticReconnect()) {
                gcConfigure();
            }
            if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
                this.connected = true;
                while (true) {
                    Runnable poll = this.registeredTasks.poll();
                    if (poll == null) {
                        break;
                    } else {
                        poll.run();
                    }
                }
                this.subscribes.forEach((str, subscribe) -> {
                    subscribe(str, subscribe.getQoS(), subscribe.getConsumer());
                });
            }
            if (!this.clientConfigure.isCleanSession()) {
                this.responseConsumers.values().forEach(ackMessage -> {
                    write(ackMessage.getOriginalMessage());
                });
            }
            consumer.accept(mqttConnAckMessage);
            this.connected = true;
        };
        if (this.clientConfigure.getKeepAliveInterval() > 0) {
            QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { // from class: org.smartboot.mqtt.client.MqttClient.2
                @Override // java.lang.Runnable
                public void run() {
                    if (MqttClient.this.pingTimeout) {
                        MqttClient.this.pingTimeout = false;
                        MqttClient.this.client.shutdown();
                    }
                    if (MqttClient.this.session.isInvalid()) {
                        if (MqttClient.this.clientConfigure.isAutomaticReconnect()) {
                            MqttClient.this.LOGGER.warn("mqtt client is disconnect, try to reconnect...");
                            MqttClient.this.connect(asynchronousChannelGroup, MqttClient.this.reconnectConsumer == null ? consumer : MqttClient.this.reconnectConsumer);
                            return;
                        }
                        return;
                    }
                    long currentTimeMillis = (System.currentTimeMillis() - MqttClient.this.getLatestSendMessageTime()) - (MqttClient.this.clientConfigure.getKeepAliveInterval() * 1000);
                    if (currentTimeMillis <= -10) {
                        QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, -currentTimeMillis, TimeUnit.MILLISECONDS);
                    } else {
                        MqttClient.this.write(new MqttPingReqMessage());
                        QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, MqttClient.this.clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
                    }
                }
            }, this.clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
        }
        this.client = new AioQuickClient(this.clientConfigure.getHost(), this.clientConfigure.getPort(), new MqttProtocol(this.clientConfigure.getMaxPacketSize()), this.messageProcessor);
        if (bufferPagePool != null) {
            try {
                this.client.setBufferPagePool(bufferPagePool);
            } catch (IOException e) {
                this.LOGGER.error(e.getMessage(), e);
                return;
            }
        }
        this.client.setReadBufferSize(this.clientConfigure.getBufferSize()).setWriteBuffer(this.clientConfigure.getBufferSize(), 8).connectTimeout(this.clientConfigure.getConnectionTimeout());
        this.session = this.client.start(asynchronousChannelGroup);
        this.session.setAttachment(new Attachment());
        setMqttVersion(this.clientConfigure.getMqttVersion());
        this.mqttWriter = new DefaultMqttWriter(this.session.writeBuffer());
        MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(this.clientConfigure.getMqttVersion(), StringUtils.isNotBlank(this.clientConfigure.getUserName()), this.clientConfigure.getPassword() != null, this.clientConfigure.getWillMessage(), this.clientConfigure.isCleanSession(), this.clientConfigure.getKeepAliveInterval());
        MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(this.clientId, this.clientConfigure.getWillMessage(), this.clientConfigure.getUserName(), this.clientConfigure.getPassword());
        if (this.clientConfigure.getMqttVersion() == MqttVersion.MQTT_5) {
            mqttConnectVariableHeader.setProperties(new ConnectProperties());
        }
        MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(mqttConnectVariableHeader, mqttConnectPayload);
        QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> {
            if (this.connected) {
                return;
            }
            disconnect();
        }, this.clientConfigure.getConnectAckTimeout(), TimeUnit.SECONDS);
        write(mqttConnectMessage);
    }

    private void gcConfigure() {
        this.clientConfigure.setWillMessage(null);
    }

    public MqttClient unsubscribe(String str) {
        return unsubscribe(new String[]{str});
    }

    public MqttClient unsubscribe(String[] strArr) {
        if (this.connected) {
            unsubscribe0(strArr);
        } else {
            this.registeredTasks.offer(() -> {
                unsubscribe0(strArr);
            });
        }
        return this;
    }

    public void unsubscribe0(String[] strArr) {
        HashSet hashSet = new HashSet(strArr.length);
        for (String str : strArr) {
            if (this.subscribes.containsKey(str)) {
                hashSet.add(str);
            }
        }
        if (hashSet.isEmpty()) {
            this.LOGGER.warn("empty unsubscribe topics detected!");
            return;
        }
        MqttMessageBuilders.UnsubscribeBuilder packetId = MqttMessageBuilders.unsubscribe().packetId(newPacketId());
        packetId.getClass();
        hashSet.forEach(packetId::addTopicFilter);
        MqttUnsubscribeMessage build = packetId.build();
        this.responseConsumers.put(Integer.valueOf(build.getVariableHeader().getPacketId()), new AckMessage(build, mqttPacketIdentifierMessage -> {
            ValidateUtils.isTrue(mqttPacketIdentifierMessage instanceof MqttUnsubAckMessage, "uncorrected message type.");
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                this.subscribes.remove(str2);
                this.wildcardsToken.removeIf(topicToken -> {
                    return StringUtils.equals(str2, topicToken.getTopicFilter());
                });
            }
        }));
        write(build);
    }

    public MqttClient subscribe(String str, MqttQoS mqttQoS, BiConsumer<MqttClient, MqttPublishMessage> biConsumer) {
        return subscribe(new String[]{str}, new MqttQoS[]{mqttQoS}, biConsumer);
    }

    public MqttClient subscribe(String str, MqttQoS mqttQoS, BiConsumer<MqttClient, MqttPublishMessage> biConsumer, BiConsumer<MqttClient, MqttQoS> biConsumer2) {
        return subscribe(new String[]{str}, new MqttQoS[]{mqttQoS}, biConsumer, biConsumer2);
    }

    public MqttClient subscribe(String[] strArr, MqttQoS[] mqttQoSArr, BiConsumer<MqttClient, MqttPublishMessage> biConsumer) {
        subscribe0(strArr, mqttQoSArr, biConsumer, (mqttClient, mqttQoS) -> {
        });
        return this;
    }

    public MqttClient subscribe(String[] strArr, MqttQoS[] mqttQoSArr, BiConsumer<MqttClient, MqttPublishMessage> biConsumer, BiConsumer<MqttClient, MqttQoS> biConsumer2) {
        if (this.connected) {
            subscribe0(strArr, mqttQoSArr, biConsumer, biConsumer2);
        } else {
            this.registeredTasks.offer(() -> {
                subscribe0(strArr, mqttQoSArr, biConsumer, biConsumer2);
            });
        }
        return this;
    }

    private void subscribe0(String[] strArr, MqttQoS[] mqttQoSArr, BiConsumer<MqttClient, MqttPublishMessage> biConsumer, BiConsumer<MqttClient, MqttQoS> biConsumer2) {
        MqttMessageBuilders.SubscribeBuilder packetId = MqttMessageBuilders.subscribe().packetId(newPacketId());
        for (int i = 0; i < strArr.length; i++) {
            packetId.addSubscription(mqttQoSArr[i], strArr[i]);
        }
        MqttSubscribeMessage build = packetId.build();
        if (this.clientConfigure.getMqttVersion() == MqttVersion.MQTT_5) {
            build.getVariableHeader().setProperties(new SubscribeProperties());
        }
        this.responseConsumers.put(Integer.valueOf(build.getVariableHeader().getPacketId()), new AckMessage(build, mqttPacketIdentifierMessage -> {
            List grantedQoSLevels = ((MqttSubAckMessage) mqttPacketIdentifierMessage).getPayload().grantedQoSLevels();
            ValidateUtils.isTrue(grantedQoSLevels.size() == mqttQoSArr.length, "invalid response");
            int i2 = 0;
            for (MqttTopicSubscription mqttTopicSubscription : build.getPayload().getTopicSubscriptions()) {
                int i3 = i2;
                i2++;
                MqttQoS valueOf = MqttQoS.valueOf(Math.min(mqttTopicSubscription.getQualityOfService().value(), ((Integer) grantedQoSLevels.get(i3)).intValue()));
                this.clientConfigure.getTopicListener().subscribe(mqttTopicSubscription.getTopicFilter(), mqttTopicSubscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : valueOf);
                if (mqttTopicSubscription.getQualityOfService() != MqttQoS.FAILURE) {
                    this.subscribes.put(mqttTopicSubscription.getTopicFilter(), new Subscribe(mqttTopicSubscription.getTopicFilter(), valueOf, biConsumer));
                    TopicToken topicToken = new TopicToken(mqttTopicSubscription.getTopicFilter());
                    if (topicToken.isWildcards()) {
                        this.wildcardsToken.add(topicToken);
                    }
                } else {
                    this.LOGGER.error("subscribe topic:{} fail", mqttTopicSubscription.getTopicFilter());
                }
                biConsumer2.accept(this, valueOf);
            }
        }));
        write(build);
    }

    public void notifyResponse(MqttConnAckMessage mqttConnAckMessage) {
        this.connectConsumer.accept(mqttConnAckMessage);
    }

    public MqttClient willMessage(WillMessage willMessage) {
        if (this.clientConfigure.getMqttVersion() != MqttVersion.MQTT_5 && willMessage != null && willMessage.getProperties() != null) {
            ValidateUtils.throwException("will properties only support on mqtt5");
        }
        this.clientConfigure.setWillMessage(willMessage);
        return this;
    }

    public void publish(String str, MqttQoS mqttQoS, byte[] bArr, boolean z) {
        publish(str, mqttQoS, bArr, z, num -> {
        });
    }

    public void publish(String str, MqttQoS mqttQoS, byte[] bArr, boolean z, Consumer<Integer> consumer) {
        MqttMessageBuilders.PublishBuilder retained = MqttMessageBuilders.publish().topicName(str).qos(mqttQoS).payload(bArr).retained(z);
        if (mqttQoS.value() > 0) {
            retained.packetId(newPacketId());
        }
        MqttPublishMessage build = retained.build();
        if (getMqttVersion() == MqttVersion.MQTT_5) {
            build.getVariableHeader().setProperties(new PublishProperties());
        }
        if (this.connected) {
            publish(build, consumer);
        } else {
            this.registeredTasks.offer(() -> {
                publish(build, consumer);
            });
        }
    }

    public MqttClientConfigure getClientConfigure() {
        return this.clientConfigure;
    }

    public Map<String, Subscribe> getSubscribes() {
        return this.subscribes;
    }

    public List<TopicToken> getWildcardsToken() {
        return this.wildcardsToken;
    }

    public void disconnect() {
        write(new MqttDisconnectMessage());
        this.clientConfigure.setAutomaticReconnect(false);
        this.disconnect = true;
        this.client.shutdown();
    }

    public void setReconnectConsumer(Consumer<MqttConnAckMessage> consumer) {
        this.reconnectConsumer = consumer;
    }
}
