package org.smartboot.mqtt.broker;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.BrokerConfigure;
import org.smartboot.mqtt.broker.eventbus.ConnectAuthenticationSubscriber;
import org.smartboot.mqtt.broker.eventbus.ConnectIdleTimeMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.KeepAliveMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer;
import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage;
import org.smartboot.mqtt.broker.plugin.Plugin;
import org.smartboot.mqtt.broker.plugin.provider.Providers;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.transport.AioQuickServer;

/* loaded from: input_file:org/smartboot/mqtt/broker/BrokerContextImpl.class */
public class BrokerContextImpl implements BrokerContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerContextImpl.class);
    private final ConcurrentMap<String, MqttSession> grantSessions = new ConcurrentHashMap();
    private final ConcurrentMap<String, BrokerTopic> topicMap = new ConcurrentHashMap();
    private final BrokerConfigure brokerConfigure = new BrokerConfigure();
    private final ScheduledExecutorService KEEP_ALIVE_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
    private final ExecutorService messageBusExecutorService = Executors.newCachedThreadPool();
    private final MessageBus messageBusSubscriber = new MessageBusSubscriber(this);
    private final EventBus eventBus = new EventBusImpl(ServerEventType.types());
    private final List<Plugin> plugins = new ArrayList();
    private final Providers providers = new Providers();
    private ExecutorService pushThreadPool;
    private AioQuickServer server;
    private BufferPagePool pagePool;

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void init() throws IOException {
        this.pushThreadPool = Executors.newFixedThreadPool(getBrokerConfigure().getPushThreadNum());
        updateBrokerConfigure();
        subscribeEventBus();
        subscribeMessageBus();
        loadAndInstallPlugins();
        try {
            this.pagePool = new BufferPagePool(1048576, this.brokerConfigure.getThreadNum(), true);
            this.server = new AioQuickServer(this.brokerConfigure.getHost(), this.brokerConfigure.getPort(), new MqttProtocol(), new MqttBrokerMessageProcessor(this));
            this.server.setBannerEnabled(false).setReadBufferSize(4096).setBufferPagePool(this.pagePool).setThreadNum(this.brokerConfigure.getThreadNum());
            this.server.start();
            System.out.println("\n                               _                         _    _       _                  _                  \n                              ( )_                      ( )_ ( )_    ( )                ( )                 \n  ___   ___ ___     _ _  _ __ | ,_)     ___ ___     _ _ | ,_)| ,_)   | |_    _ __   _   | |/')    __   _ __ \n/',__)/' _ ` _ `\\ /'_` )( '__)| |     /' _ ` _ `\\ /'_` )| |  | |     | '_`\\ ( '__)/'_`\\ | , <   /'__`\\( '__)\n\\__, \\| ( ) ( ) |( (_| || |   | |_    | ( ) ( ) |( (_) || |_ | |_    | |_) )| |  ( (_) )| |\\`\\ (  ___/| |   \n(____/(_) (_) (_)`\\__,_)(_)   `\\__)   (_) (_) (_)`\\__, |`\\__)`\\__)   (_,__/'(_)  `\\___/'(_) (_)`\\____)(_)   \n                                                     | |                                                    \n                                                     (_)                                                    \r\n :: smart-mqtt broker::\t(v0.8)");
            System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt");
            System.out.println("Github: https://github.com/smartboot/smart-mqtt");
            if (StringUtils.isBlank(this.brokerConfigure.getHost())) {
                System.out.println("��start smart-mqtt success! [port:" + this.brokerConfigure.getPort() + "]");
            } else {
                System.out.println("��start smart-mqtt success! [host:" + this.brokerConfigure.getHost() + " port:" + this.brokerConfigure.getPort() + "]");
            }
            this.eventBus.publish(ServerEventType.BROKER_STARTED, this);
        } catch (Exception e) {
            destroy();
            throw e;
        }
    }

    private void subscribeMessageBus() {
        this.messageBusSubscriber.consumer(new RetainPersistenceConsumer(this), (v0) -> {
            return v0.isRetained();
        });
    }

    private void subscribeEventBus() {
        this.eventBus.subscribe(ServerEventType.RECEIVE_PUBLISH_MESSAGE, this.messageBusSubscriber);
        this.eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this));
        this.eventBus.subscribe(ServerEventType.CONNECT, new ConnectAuthenticationSubscriber(this));
        this.eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this));
        this.eventBus.subscribe(ServerEventType.SUBSCRIBE_TOPIC, new EventBusSubscriber<TopicSubscriber>() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.1
            public void subscribe(EventType<TopicSubscriber> eventType, final TopicSubscriber topicSubscriber) {
                BrokerContextImpl.this.pushThreadPool.execute(new AsyncTask() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.1.1
                    public void execute() {
                        PersistenceMessage persistenceMessage = BrokerContextImpl.this.providers.getRetainMessageProvider().get(topicSubscriber.getTopic().getTopic(), topicSubscriber.getRetainConsumerOffset());
                        if (persistenceMessage == null || persistenceMessage.getCreateTime() > topicSubscriber.getLatestSubscribeTime()) {
                            topicSubscriber.getMqttSession().batchPublish(topicSubscriber, BrokerContextImpl.this.pushThreadPool);
                            return;
                        }
                        MqttSession mqttSession = topicSubscriber.getMqttSession();
                        MqttPublishMessage createPublishMessage = MqttUtil.createPublishMessage(mqttSession.newPacketId(), persistenceMessage.getTopic(), topicSubscriber.getMqttQoS(), persistenceMessage.getPayload());
                        InflightQueue inflightQueue = mqttSession.getInflightQueue();
                        int add = inflightQueue.add(createPublishMessage, persistenceMessage.getOffset());
                        TopicSubscriber topicSubscriber2 = topicSubscriber;
                        mqttSession.publish(createPublishMessage, num -> {
                            BrokerContextImpl.LOGGER.info("publish retain to client:{} success ,message:{} ", mqttSession.getClientId(), createPublishMessage);
                            inflightQueue.commit(add, l -> {
                                topicSubscriber2.setRetainConsumerOffset(l.longValue() + 1);
                            });
                            inflightQueue.clear();
                            BrokerContextImpl.this.pushThreadPool.execute(this);
                        });
                    }
                });
            }

            public /* bridge */ /* synthetic */ void subscribe(EventType eventType, Object obj) {
                subscribe((EventType<TopicSubscriber>) eventType, (TopicSubscriber) obj);
            }
        });
    }

    private void updateBrokerConfigure() throws IOException {
        Properties properties = new Properties();
        properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("smart-mqtt.properties"));
        String property = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig);
        if (StringUtils.isNotBlank(property)) {
            File file = new File(property);
            ValidateUtils.isTrue(file.isFile(), "文件不存在");
            properties.load(new FileInputStream(file));
        }
        BrokerConfigure.SystemEnvironments.forEach((str, str2) -> {
            String str = System.getenv(str);
            if (str != null) {
                properties.setProperty(str2, str);
            }
        });
        System.getProperties().stringPropertyNames().forEach(str3 -> {
            properties.setProperty(str3, System.getProperty(str3));
        });
        properties.stringPropertyNames().forEach(str4 -> {
            this.brokerConfigure.setProperty(str4, properties.getProperty(str4));
        });
        this.brokerConfigure.setHost(properties.getProperty(BrokerConfigure.SystemProperty.HOST));
        this.brokerConfigure.setPort(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.PORT, BrokerConfigure.SystemPropertyDefaultValue.PORT)));
        this.brokerConfigure.setNoConnectIdleTimeout(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.CONNECT_IDLE_TIMEOUT, BrokerConfigure.SystemPropertyDefaultValue.CONNECT_TIMEOUT)));
        this.brokerConfigure.setMaxInflight(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.MAX_INFLIGHT, BrokerConfigure.SystemPropertyDefaultValue.MAX_INFLIGHT)));
        this.brokerConfigure.setUsername(properties.getProperty(BrokerConfigure.SystemProperty.USERNAME));
        this.brokerConfigure.setPassword(properties.getProperty(BrokerConfigure.SystemProperty.PASSWORD));
        this.brokerConfigure.setThreadNum(Integer.parseInt(properties.getProperty(BrokerConfigure.SystemProperty.THREAD_NUM, String.valueOf(Runtime.getRuntime().availableProcessors()))));
    }

    private void loadAndInstallPlugins() {
        Iterator it = ServiceLoader.load(Plugin.class, Providers.class.getClassLoader()).iterator();
        while (it.hasNext()) {
            Plugin plugin = (Plugin) it.next();
            LOGGER.info("load plugin: " + plugin.pluginName());
            this.plugins.add(plugin);
        }
        this.plugins.forEach(plugin2 -> {
            LOGGER.info("install plugin: " + plugin2.pluginName());
            plugin2.install(this);
        });
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public BrokerConfigure getBrokerConfigure() {
        return this.brokerConfigure;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void addSession(MqttSession mqttSession) {
        this.grantSessions.putIfAbsent(mqttSession.getClientId(), mqttSession);
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public BrokerTopic getOrCreateTopic(String str) {
        return this.topicMap.computeIfAbsent(str, str2 -> {
            ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(str2), "invalid topicName: " + str2);
            BrokerTopic brokerTopic = new BrokerTopic(str2);
            this.eventBus.publish(ServerEventType.TOPIC_CREATE, brokerTopic);
            return brokerTopic;
        });
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public Collection<BrokerTopic> getTopics() {
        return this.topicMap.values();
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MessageBus getMessageBus() {
        return this.messageBusSubscriber;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public EventBus getEventBus() {
        return this.eventBus;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MqttSession removeSession(String str) {
        if (!StringUtils.isBlank(str)) {
            return this.grantSessions.remove(str);
        }
        LOGGER.warn("clientId is blank, ignore remove grantSession");
        return null;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public MqttSession getSession(String str) {
        return this.grantSessions.get(str);
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public ScheduledExecutorService getKeepAliveThreadPool() {
        return this.KEEP_ALIVE_EXECUTOR;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public Providers getProviders() {
        return this.providers;
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void batchPublish(BrokerTopic brokerTopic) {
        brokerTopic.getConsumeOffsets().values().stream().filter(topicSubscriber -> {
            return topicSubscriber.getSemaphore().availablePermits() > 0;
        }).forEach(topicSubscriber2 -> {
            this.pushThreadPool.execute(new AsyncTask() { // from class: org.smartboot.mqtt.broker.BrokerContextImpl.2
                public void execute() {
                    topicSubscriber2.getMqttSession().batchPublish(topicSubscriber2, BrokerContextImpl.this.pushThreadPool);
                }
            });
        });
    }

    @Override // org.smartboot.mqtt.broker.BrokerContext
    public void destroy() {
        LOGGER.info("destroy broker...");
        this.eventBus.publish(ServerEventType.BROKER_DESTROY, this);
        this.messageBusExecutorService.shutdown();
        this.server.shutdown();
        this.pagePool.release();
    }
}
