package org.smartboot.mqtt.broker;

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.BrokerConfigure;
import org.smartboot.mqtt.broker.eventbus.ConnectIdleTimeMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.KeepAliveMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer;
import org.smartboot.mqtt.broker.plugin.Plugin;
import org.smartboot.mqtt.broker.provider.Providers;
import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationProviderImpl;
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.enums.MqttMetricEnum;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.to.MetricItemTO;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
import org.smartboot.socket.extension.plugins.AbstractPlugin;
import org.smartboot.socket.transport.AioQuickServer;
import org.smartboot.socket.transport.AioSession;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/smartboot/mqtt/broker/BrokerContextImpl.class */
public class BrokerContextImpl implements BrokerContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerContextImpl.class);
    private ExecutorService pushThreadPool;
    private ExecutorService retainPushThreadPool;
    private BlockingQueue<BrokerTopic> pushTopicQueue;
    private AioQuickServer server;
    private BufferPagePool pagePool;
    private String configJson;
    private AsynchronousChannelGroup asynchronousChannelGroup;
    private final ConcurrentMap<String, MqttSession> grantSessions = new ConcurrentHashMap();
    private final ConcurrentMap<String, BrokerTopic> topicMap = new ConcurrentHashMap();
    private BrokerConfigure brokerConfigure = new BrokerConfigure();
    private final ScheduledExecutorService KEEP_ALIVE_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
    private final ExecutorService messageBusExecutorService = Executors.newCachedThreadPool();
    private final MessageBus messageBusSubscriber = new MessageBusSubscriber();
    private final EventBus eventBus = new EventBusImpl(ServerEventType.types());
    private final List<Plugin> plugins = new ArrayList();
    private final Providers providers = new Providers();
    private final BrokerRuntime runtime = new BrokerRuntime();
    private final MqttBrokerMessageProcessor processor = new MqttBrokerMessageProcessor(this);
    private final BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic("");
    private final Map<MqttMetricEnum, MetricItemTO> metricMap = new HashMap();
    private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0);

    /* renamed from: org.smartboot.mqtt.broker.BrokerContextImpl$6, reason: invalid class name */
    /* loaded from: input_file:org/smartboot/mqtt/broker/BrokerContextImpl$6.class */
    static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void init() throws IOException {
        updateBrokerConfigure();
        initProvider();
        subscribeEventBus();
        subscribeMessageBus();
        initPushThread();
        initMetric();
        loadAndInstallPlugins();
        try {
            this.asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.1
                int i;

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    StringBuilder append = new StringBuilder().append("smart-mqtt-broker-");
                    int i = this.i + 1;
                    this.i = i;
                    return new Thread(runnable, append.append(i).toString());
                }
            });
            this.pagePool = new BufferPagePool(10485760, this.brokerConfigure.getThreadNum(), true);
            this.server = new AioQuickServer(this.brokerConfigure.getHost(), this.brokerConfigure.getPort(), new MqttProtocol(this.brokerConfigure.getMaxPacketSize()), this.processor);
            this.server.setBannerEnabled(false).setReadBufferSize(this.brokerConfigure.getBufferSize()).setWriteBuffer(this.brokerConfigure.getBufferSize(), Math.min(this.brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(this.pagePool).setThreadNum(Math.max(2, this.brokerConfigure.getThreadNum()));
            this.server.start(this.asynchronousChannelGroup);
            System.out.println("\n                               _                         _    _       _                  _                  \n                              ( )_                      ( )_ ( )_    ( )                ( )                 \n  ___   ___ ___     _ _  _ __ | ,_)     ___ ___     _ _ | ,_)| ,_)   | |_    _ __   _   | |/')    __   _ __ \n/',__)/' _ ` _ `\\ /'_` )( '__)| |     /' _ ` _ `\\ /'_` )| |  | |     | '_`\\ ( '__)/'_`\\ | , <   /'__`\\( '__)\n\\__, \\| ( ) ( ) |( (_| || |   | |_    | ( ) ( ) |( (_) || |_ | |_    | |_) )| |  ( (_) )| |\\`\\ (  ___/| |   \n(____/(_) (_) (_)`\\__,_)(_)   `\\__)   (_) (_) (_)`\\__, |`\\__)`\\__)   (_,__/'(_)  `\\___/'(_) (_)`\\____)(_)   \n                                                     | |                                                    \n                                                     (_)                                                    \r\n :: smart-mqtt broker::\t(v0.16)");
            System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt");
            System.out.println("Github: https://github.com/smartboot/smart-mqtt");
            if (StringUtils.isBlank(this.brokerConfigure.getHost())) {
                System.out.println("��start smart-mqtt success! [port:" + this.brokerConfigure.getPort() + "]");
            } else {
                System.out.println("��start smart-mqtt success! [host:" + this.brokerConfigure.getHost() + " port:" + this.brokerConfigure.getPort() + "]");
            }
            if (StringUtils.isBlank(this.brokerConfigure.getName())) {
                this.runtime.setName("smart-mqtt@" + (StringUtils.isBlank(this.brokerConfigure.getHost()) ? "0.0.0.0" : this.brokerConfigure.getHost()));
            } else {
                this.runtime.setName(this.brokerConfigure.getName());
            }
            this.runtime.setStartTime(System.currentTimeMillis());
            this.runtime.setPid(ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
            this.runtime.setIpAddress(this.brokerConfigure.getHost() + ":" + this.brokerConfigure.getPort());
            this.eventBus.publish(ServerEventType.BROKER_STARTED, this);
            this.configJson = null;
        } catch (Exception e) {
            destroy();
            throw e;
        }
    }

    private void initMetric() {
        for (MqttMetricEnum mqttMetricEnum : MqttMetricEnum.values()) {
            this.metricMap.put(mqttMetricEnum, new MetricItemTO(mqttMetricEnum));
        }
        this.processor.addPlugin(new AbstractPlugin<MqttMessage>() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.2
            public void afterRead(AioSession aioSession, int i) {
                if (i > 0) {
                    ((MetricItemTO) BrokerContextImpl.this.metricMap.get(MqttMetricEnum.BYTES_RECEIVED)).getMetric().add(i);
                }
            }

            public void afterWrite(AioSession aioSession, int i) {
                if (i > 0) {
                    ((MetricItemTO) BrokerContextImpl.this.metricMap.get(MqttMetricEnum.BYTES_SENT)).getMetric().add(i);
                }
            }
        });
        this.eventBus.subscribe(ServerEventType.CONNECT, (eventType, eventObject) -> {
            this.metricMap.get(MqttMetricEnum.CLIENT_CONNECT).getMetric().increment();
        });
        this.eventBus.subscribe(ServerEventType.DISCONNECT, (eventType2, abstractSession) -> {
            this.metricMap.get(MqttMetricEnum.CLIENT_DISCONNECT).getMetric().increment();
        });
        this.eventBus.subscribe(ServerEventType.SUBSCRIBE_ACCEPT, (eventType3, eventObject2) -> {
            this.metricMap.get(MqttMetricEnum.CLIENT_SUBSCRIBE).getMetric().increment();
        });
        this.eventBus.subscribe(ServerEventType.UNSUBSCRIBE_ACCEPT, (eventType4, eventObject3) -> {
            this.metricMap.get(MqttMetricEnum.CLIENT_UNSUBSCRIBE).getMetric().increment();
        });
        this.eventBus.subscribe(EventType.RECEIVE_MESSAGE, (eventType5, eventObject4) -> {
            this.metricMap.get(MqttMetricEnum.PACKETS_RECEIVED).getMetric().increment();
            if (eventObject4.getObject() instanceof MqttConnectMessage) {
                this.metricMap.get(MqttMetricEnum.PACKETS_CONNECT_RECEIVED).getMetric().increment();
            }
        });
        this.eventBus.subscribe(EventType.WRITE_MESSAGE, (eventType6, eventObject5) -> {
            this.metricMap.get(MqttMetricEnum.PACKETS_SENT).getMetric().increment();
            if (eventObject5.getObject() instanceof MqttConnAckMessage) {
                this.metricMap.get(MqttMetricEnum.PACKETS_CONNACK_SENT).getMetric().increment();
                return;
            }
            if (eventObject5.getObject() instanceof MqttPublishMessage) {
                switch (AnonymousClass6.$SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS[((MqttMessage) eventObject5.getObject()).getFixedHeader().getQosLevel().ordinal()]) {
                    case 1:
                        this.metricMap.get(MqttMetricEnum.MESSAGE_QOS0_SENT).getMetric().increment();
                        return;
                    case 2:
                        this.metricMap.get(MqttMetricEnum.MESSAGE_QOS1_SENT).getMetric().increment();
                        return;
                    case 3:
                        this.metricMap.get(MqttMetricEnum.MESSAGE_QOS2_SENT).getMetric().increment();
                        return;
                    default:
                        return;
                }
            }
        });
        this.messageBusSubscriber.consumer((brokerContext, mqttPublishMessage) -> {
            switch (AnonymousClass6.$SwitchMap$org$smartboot$mqtt$common$enums$MqttQoS[mqttPublishMessage.getFixedHeader().getQosLevel().ordinal()]) {
                case 1:
                    this.metricMap.get(MqttMetricEnum.MESSAGE_QOS0_RECEIVED).getMetric().increment();
                    return;
                case 2:
                    this.metricMap.get(MqttMetricEnum.MESSAGE_QOS1_RECEIVED).getMetric().increment();
                    return;
                case 3:
                    this.metricMap.get(MqttMetricEnum.MESSAGE_QOS2_RECEIVED).getMetric().increment();
                    return;
                default:
                    return;
            }
        });
    }

    private void initProvider() {
        this.providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this));
    }

    private void initPushThread() {
        if (this.brokerConfigure.getTopicLimit() <= 0) {
            this.brokerConfigure.setTopicLimit(10);
        }
        this.pushTopicQueue = this.brokerConfigure.getTopicLimit() <= 4096 ? new ArrayBlockingQueue<>(this.brokerConfigure.getTopicLimit()) : new LinkedBlockingQueue<>(this.brokerConfigure.getTopicLimit());
        this.retainPushThreadPool = Executors.newFixedThreadPool(getBrokerConfigure().getPushThreadNum());
        this.pushThreadPool = Executors.newFixedThreadPool(getBrokerConfigure().getPushThreadNum(), new ThreadFactory() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.3
            int index = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                StringBuilder append = new StringBuilder().append("broker-push-");
                int i = this.index;
                this.index = i + 1;
                return new Thread(runnable, append.append(i).toString());
            }
        });
        for (int i = 0; i < getBrokerConfigure().getPushThreadNum(); i++) {
            this.pushThreadPool.execute(new AsyncTask() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.4
                public void execute() {
                    while (true) {
                        try {
                            BrokerTopic brokerTopic = (BrokerTopic) BrokerContextImpl.this.pushTopicQueue.take();
                            int size = BrokerContextImpl.this.pushTopicQueue.size();
                            if (size > 1024) {
                                System.out.println("queue:" + size);
                            }
                            if (BrokerContextImpl.this.SHUTDOWN_TOPIC == brokerTopic) {
                                BrokerContextImpl.this.pushTopicQueue.put(BrokerContextImpl.this.SHUTDOWN_TOPIC);
                                return;
                            }
                            try {
                                ConcurrentLinkedQueue<TopicSubscriber> queue = brokerTopic.getQueue();
                                queue.offer(BrokerContextImpl.this.BREAK);
                                int i2 = brokerTopic.getVersion().get();
                                while (true) {
                                    TopicSubscriber poll = queue.poll();
                                    if (poll == BrokerContextImpl.this.BREAK) {
                                        break;
                                    }
                                    try {
                                        poll.batchPublish(BrokerContextImpl.this);
                                    } catch (Exception e) {
                                        BrokerContextImpl.LOGGER.error("batch publish exception:{}", e.getMessage());
                                    }
                                }
                                brokerTopic.getSemaphore().release();
                                if (i2 != brokerTopic.getVersion().get() && !queue.isEmpty()) {
                                    BrokerContextImpl.this.notifyPush(brokerTopic);
                                }
                            } catch (Exception e2) {
                                BrokerContextImpl.LOGGER.error("brokerTopic:{} push message exception", brokerTopic.getTopic(), e2);
                            }
                        } catch (InterruptedException e3) {
                            BrokerContextImpl.LOGGER.error("pushTopicQueue exception", e3);
                            return;
                        }
                    }
                }
            });
        }
    }

    private void subscribeMessageBus() {
        this.messageBusSubscriber.consumer((brokerContext, mqttPublishMessage) -> {
            this.providers.getPersistenceProvider().doSave(mqttPublishMessage);
        });
        this.messageBusSubscriber.consumer(new RetainPersistenceConsumer(), mqttPublishMessage2 -> {
            return mqttPublishMessage2.getFixedHeader().isRetain();
        });
    }

    private void subscribeEventBus() {
        this.eventBus.subscribe(ServerEventType.RECEIVE_PUBLISH_MESSAGE, (eventType, eventObject) -> {
            BrokerTopic orCreateTopic = getOrCreateTopic(((MqttPublishMessage) eventObject.getObject()).getVariableHeader().getTopicName());
            try {
                this.messageBusSubscriber.consume(this, (MqttPublishMessage) eventObject.getObject());
                this.eventBus.publish(ServerEventType.MESSAGE_BUS_CONSUMED, orCreateTopic);
            } catch (Throwable th) {
                this.eventBus.publish(ServerEventType.MESSAGE_BUS_CONSUMED, orCreateTopic);
                throw th;
            }
        });
        this.eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this));
        this.eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this));
        this.eventBus.subscribe(ServerEventType.MESSAGE_BUS_CONSUMED, (eventType2, brokerTopic) -> {
            brokerTopic.getVersion().incrementAndGet();
            notifyPush(brokerTopic);
        });
        this.eventBus.subscribe(ServerEventType.SUBSCRIBE_TOPIC, new EventBusSubscriber<TopicSubscriber>() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.5
            public void subscribe(EventType<TopicSubscriber> eventType3, final TopicSubscriber topicSubscriber) {
                BrokerContextImpl.this.retainPushThreadPool.execute(new AsyncTask() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.5.1
                    public void execute() {
                        PersistenceMessage persistenceMessage = BrokerContextImpl.this.providers.getRetainMessageProvider().get(topicSubscriber.getTopic().getTopic(), topicSubscriber.getRetainConsumerOffset());
                        if (persistenceMessage == null || persistenceMessage.getCreateTime() > topicSubscriber.getLatestSubscribeTime()) {
                            BrokerTopic topic = topicSubscriber.getTopic();
                            topic.getQueue().offer(topicSubscriber);
                            BrokerContextImpl.this.notifyPush(topic);
                            return;
                        }
                        MqttSession mqttSession = topicSubscriber.getMqttSession();
                        MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(topicSubscriber.getMqttQoS()).topicName(persistenceMessage.getTopic());
                        if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
                            publishBuilder.publishProperties(new PublishProperties());
                        }
                        InflightQueue inflightQueue = mqttSession.getInflightQueue();
                        TopicSubscriber topicSubscriber2 = topicSubscriber;
                        inflightQueue.offer(publishBuilder, l -> {
                            BrokerContextImpl.LOGGER.info("publish retain to client:{} success  ", mqttSession.getClientId());
                            topicSubscriber2.setRetainConsumerOffset(l.longValue() + 1);
                            BrokerContextImpl.this.retainPushThreadPool.execute(this);
                        }, persistenceMessage.getOffset());
                        mqttSession.flush();
                    }
                });
            }

            public /* bridge */ /* synthetic */ void subscribe(EventType eventType3, Object obj) {
                subscribe((EventType<TopicSubscriber>) eventType3, (TopicSubscriber) obj);
            }
        });
        this.eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType3, topicSubscriber) -> {
            LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", topicSubscriber.getTopicFilterToken().getTopicFilter(), topicSubscriber.getTopic().getTopic());
            topicSubscriber.getTopic().getQueue().offer(topicSubscriber);
        });
    }

    void notifyPush(BrokerTopic brokerTopic) {
        if (brokerTopic.getSemaphore().tryAcquire()) {
            try {
                this.pushTopicQueue.put(brokerTopic);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void updateBrokerConfigure() throws IOException {
        loadYamlConfig();
        this.brokerConfigure = (BrokerConfigure) parseConfig("$.broker", BrokerConfigure.class);
        Properties properties = new Properties();
        BrokerConfigure.SystemEnvironments.forEach((str, str2) -> {
            String str = System.getenv(str);
            if (str != null) {
                properties.setProperty(str2, str);
            }
        });
        System.getProperties().stringPropertyNames().forEach(str3 -> {
            properties.setProperty(str3, System.getProperty(str3));
        });
        if (properties.containsKey(BrokerConfigure.SystemProperty.HOST)) {
            this.brokerConfigure.setHost(properties.getProperty(BrokerConfigure.SystemProperty.HOST));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.PORT)) {
            this.brokerConfigure.setPort(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.PORT)));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.CONNECT_IDLE_TIMEOUT)) {
            this.brokerConfigure.setNoConnectIdleTimeout(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.CONNECT_IDLE_TIMEOUT)));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.MAX_INFLIGHT)) {
            this.brokerConfigure.setMaxInflight(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.MAX_INFLIGHT)));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.USERNAME)) {
            this.brokerConfigure.setUsername(properties.getProperty(BrokerConfigure.SystemProperty.USERNAME));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.PASSWORD)) {
            this.brokerConfigure.setPassword(properties.getProperty(BrokerConfigure.SystemProperty.PASSWORD));
        }
        if (properties.containsKey(BrokerConfigure.SystemProperty.THREAD_NUM)) {
            this.brokerConfigure.setThreadNum(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.THREAD_NUM)));
        }
        if (StringUtils.isBlank(this.brokerConfigure.getHost())) {
            this.brokerConfigure.setHost("0.0.0.0");
        }
    }

    private void loadAndInstallPlugins() {
        Iterator it = ServiceLoader.load(Plugin.class, Providers.class.getClassLoader()).iterator();
        while (it.hasNext()) {
            Plugin plugin = (Plugin) it.next();
            LOGGER.info("load plugin: " + plugin.pluginName());
            this.plugins.add(plugin);
        }
        this.plugins.stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.order();
        })).forEach(plugin2 -> {
            LOGGER.info("install plugin: " + plugin2.pluginName());
            plugin2.install(this);
        });
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public BrokerConfigure getBrokerConfigure() {
        return this.brokerConfigure;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void addSession(MqttSession mqttSession) {
        this.grantSessions.putIfAbsent(mqttSession.getClientId(), mqttSession);
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public BrokerTopic getOrCreateTopic(String str) {
        return this.topicMap.computeIfAbsent(str, str2 -> {
            ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(str2), "invalid topicName: " + str2);
            BrokerTopic brokerTopic = new BrokerTopic(str2);
            this.eventBus.publish(ServerEventType.TOPIC_CREATE, brokerTopic);
            metric(MqttMetricEnum.TOPIC_COUNT).getMetric().increment();
            return brokerTopic;
        });
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public Collection<BrokerTopic> getTopics() {
        return this.topicMap.values();
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MessageBus getMessageBus() {
        return this.messageBusSubscriber;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public EventBus getEventBus() {
        return this.eventBus;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MqttSession removeSession(String str) {
        if (!StringUtils.isBlank(str)) {
            return this.grantSessions.remove(str);
        }
        LOGGER.warn("clientId is blank, ignore remove grantSession");
        return null;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MqttSession getSession(String str) {
        return this.grantSessions.get(str);
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public Collection<MqttSession> getSessions() {
        return Collections.unmodifiableCollection(this.grantSessions.values());
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public ScheduledExecutorService getKeepAliveThreadPool() {
        return this.KEEP_ALIVE_EXECUTOR;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public Providers getProviders() {
        return this.providers;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public BrokerRuntime getRuntime() {
        return this.runtime;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public <T> T parseConfig(String str, Class<T> cls) {
        Object extract = JSONPath.of(str).extract(JSONReader.of(this.configJson));
        if (extract instanceof JSONObject) {
            return (T) ((JSONObject) extract).to(cls, new JSONReader.Feature[0]);
        }
        return null;
    }

    public void loadYamlConfig() throws IOException {
        InputStream newInputStream;
        String property = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig);
        if (StringUtils.isBlank(property)) {
            newInputStream = BrokerContext.class.getClassLoader().getResourceAsStream("smart-mqtt.yaml");
            LOGGER.info("load internal yaml config.");
        } else {
            newInputStream = Files.newInputStream(Paths.get(property, new String[0]), new OpenOption[0]);
            LOGGER.info("load external yaml config.");
        }
        this.configJson = JSONObject.toJSONString(new Yaml().load(newInputStream), new JSONWriter.Feature[0]);
        if (newInputStream != null) {
            newInputStream.close();
        }
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MqttBrokerMessageProcessor getMessageProcessor() {
        return this.processor;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MetricItemTO metric(MqttMetricEnum mqttMetricEnum) {
        return this.metricMap.get(mqttMetricEnum);
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void destroy() {
        LOGGER.info("destroy broker...");
        this.eventBus.publish(ServerEventType.BROKER_DESTROY, this);
        this.messageBusExecutorService.shutdown();
        this.pushTopicQueue.offer(this.SHUTDOWN_TOPIC);
        this.pushThreadPool.shutdown();
        this.server.shutdown();
        this.asynchronousChannelGroup.shutdown();
        this.pagePool.release();
        this.plugins.forEach((v0) -> {
            v0.uninstall();
        });
        this.plugins.clear();
    }
}
