package tech.mystox.framework.mqtt.service.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import tech.mystox.framework.config.IaConf;
import tech.mystox.framework.core.IaContext;
import tech.mystox.framework.core.IaENV;
import tech.mystox.framework.mqtt.config.MqttConfigInstance;
import tech.mystox.framework.mqtt.config.MultiMqttMessageHandler;
import tech.mystox.framework.mqtt.config.MyMqttPahoMessageHandler;
import tech.mystox.framework.mqtt.service.ExecutorRunner;
import tech.mystox.framework.mqtt.service.IMqttSender;

/* loaded from: input_file:tech/mystox/framework/mqtt/service/impl/DefaultMqttHandler.class */
public class DefaultMqttHandler extends MqttHandler {
    private static final byte[] WILL_DATA = "offline".getBytes();
    private final IaENV iaENV;
    private MqttReceiver mqttReceiver;
    private IMqttSender iMqttSender;
    private final ExecutorRunner executorRunner;
    private final MqttPahoClientFactory mqttPahoClientFactory;
    private MultiMqttMessageHandler multiMqttMessageHandler;
    private MqttPahoMessageDrivenChannelAdapter channelConsumerDrivenChannelAdapter;
    private MqttPahoMessageDrivenChannelAdapter replyProducerDrivenChannelAdapter;

    public DefaultMqttHandler(IaContext iaContext) {
        super(iaContext.getIaENV());
        this.iaENV = iaContext.getIaENV();
        Properties mqMsgProperties = this.iaENV.getConf().getMqMsgProperties();
        this.mqttPahoClientFactory = mqttClientFactory();
        int intValue = ((Integer) mqMsgProperties.getOrDefault("mqtt.executor.corePoolSize", 10)).intValue();
        int intValue2 = ((Integer) mqMsgProperties.getOrDefault("mqtt.executor.maxPoolSize", 10000)).intValue();
        ThreadPoolTaskExecutor builder = builder(intValue, intValue2, 5000, 30000, "mqttExecutor-");
        ThreadPoolTaskExecutor builder2 = builder(intValue, intValue2, 2000, 10000, "mqttAck-");
        int intValue3 = ((Integer) mqMsgProperties.getOrDefault("mqtt.sender.count", 10)).intValue();
        this.mqttHandlerAck = new ChannelHandlerAck(replyProducer(builderTaskScheduler(intValue, intValue2, "mqtt-reply")));
        this.mqttSenderImpl = createSender(builder2, Integer.valueOf(intValue3));
        this.mqttHandlerImpl = new ChannelHandlerSub(channelConsumer(builderTaskScheduler(intValue, intValue2, "mqtt-consumer")));
        receiverInit(iaContext, builder);
        this.executorRunner = new ExecutorRunner(builder, builder2, Executors.newScheduledThreadPool(10), this.mqttSenderImpl);
    }

    public ExecutorRunner getExecutorRunner() {
        return this.executorRunner;
    }

    void receiverInit(IaContext iaContext, ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.mqttReceiver = new MqttReceiver(iaContext, this.iMqttSender, threadPoolTaskExecutor);
        MqttConfigInstance.getInstance().mqttInboundChannel().subscribe(message -> {
            this.mqttReceiver.messageReceiver(message);
        });
        MqttConfigInstance.getInstance().mqttReplyChannel().subscribe(message2 -> {
            this.mqttSenderImpl.messageReceiver(message2);
        });
    }

    private MessageProducer channelConsumer(TaskScheduler taskScheduler) {
        IaConf conf = this.iaENV.getConf();
        Properties mqMsgProperties = conf.getMqMsgProperties();
        String join = String.join("_", conf.getGroupCode(), conf.getServerName(), conf.getServerVersion(), "consumer", UUID.randomUUID().toString());
        String property = mqMsgProperties.getProperty("mqtt.consumer.defaultTopic", "topic");
        Integer num = (Integer) mqMsgProperties.getOrDefault("mqtt.completionTimeout", 3000);
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(join, this.mqttPahoClientFactory, StringUtils.split(property, ","));
        mqttPahoMessageDrivenChannelAdapter.setCompletionTimeout(num.intValue());
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(MqttConfigInstance.getInstance().mqttInboundChannel());
        mqttPahoMessageDrivenChannelAdapter.setTaskScheduler(taskScheduler);
        mqttPahoMessageDrivenChannelAdapter.start();
        return mqttPahoMessageDrivenChannelAdapter;
    }

    private TaskScheduler builderTaskScheduler(int i, int i2, String str) {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(5);
        threadPoolTaskScheduler.setThreadNamePrefix(str);
        threadPoolTaskScheduler.initialize();
        return threadPoolTaskScheduler;
    }

    public MessageProducer replyProducer(TaskScheduler taskScheduler) {
        IaConf conf = this.iaENV.getConf();
        String join = String.join("_", conf.getGroupCode(), conf.getServerName(), conf.getServerVersion(), "producer", UUID.randomUUID().toString());
        Integer num = (Integer) this.iaENV.getConf().getMqMsgProperties().getOrDefault("mqtt.completionTimeout", 3000);
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(join + "_reply", this.mqttPahoClientFactory, new String[]{"topic_ack"});
        mqttPahoMessageDrivenChannelAdapter.setCompletionTimeout(num.intValue());
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(MqttConfigInstance.getInstance().mqttReplyChannel());
        mqttPahoMessageDrivenChannelAdapter.setTaskScheduler(taskScheduler);
        mqttPahoMessageDrivenChannelAdapter.start();
        return mqttPahoMessageDrivenChannelAdapter;
    }

    ChannelSenderImpl createSender(ThreadPoolTaskExecutor threadPoolTaskExecutor, Integer num) {
        this.multiMqttMessageHandler = new MultiMqttMessageHandler(this::createMqttOutbound, num);
        final DirectChannel mqttOutboundChannel = MqttConfigInstance.getInstance().mqttOutboundChannel();
        mqttOutboundChannel.subscribe(this.multiMqttMessageHandler);
        this.multiMqttMessageHandler.start();
        this.iMqttSender = new IMqttSender() { // from class: tech.mystox.framework.mqtt.service.impl.DefaultMqttHandler.1
            @Override // tech.mystox.framework.mqtt.service.IMqttSender
            public void sendToMqtt(String str) {
            }

            @Override // tech.mystox.framework.mqtt.service.IMqttSender
            public void sendToMqtt(String str, String str2) throws Exception {
                HashMap hashMap = new HashMap();
                hashMap.put("mqtt_qos", 2);
                mqttOutboundChannel.send(DefaultMqttHandler.this.buildMessage(str, str2, hashMap));
            }

            @Override // tech.mystox.framework.mqtt.service.IMqttSender
            public void sendToMqtt(String str, int i, String str2) throws Exception {
                HashMap hashMap = new HashMap();
                hashMap.put("mqtt_qos", Integer.valueOf(i));
                mqttOutboundChannel.send(DefaultMqttHandler.this.buildMessage(str, str2, hashMap));
            }
        };
        return new ChannelSenderImpl(this.iaENV, this.iaENV.getConf(), this.iMqttSender, threadPoolTaskExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message<String> buildMessage(String str, String str2, Map<String, Object> map) throws Exception {
        map.put("id", UUID.randomUUID());
        map.put("mqtt_topic", str);
        map.put("timestamp", Long.valueOf(System.currentTimeMillis()));
        return new GenericMessage(str2, new MessageHeaders(map));
    }

    public MessageHandler createMqttOutbound() {
        IaConf conf = this.iaENV.getConf();
        Properties mqMsgProperties = conf.getMqMsgProperties();
        String join = String.join("_", conf.getGroupCode(), conf.getServerName(), conf.getServerVersion(), "producer", UUID.randomUUID().toString());
        String property = mqMsgProperties.getProperty("mqtt.producer.defaultTopic", "topic");
        MyMqttPahoMessageHandler myMqttPahoMessageHandler = new MyMqttPahoMessageHandler(join + "_" + MqttAsyncClient.generateClientId(), this.mqttPahoClientFactory);
        myMqttPahoMessageHandler.setAsync(true);
        myMqttPahoMessageHandler.setDefaultTopic(property);
        myMqttPahoMessageHandler.setCompletionTimeout(10000);
        myMqttPahoMessageHandler.setDefaultQos(1);
        myMqttPahoMessageHandler.onInit();
        return myMqttPahoMessageHandler;
    }

    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        Properties mqMsgProperties = this.iaENV.getConf().getMqMsgProperties();
        mqttConnectOptions.setUserName(String.valueOf(mqMsgProperties.getProperty("mqtt.username")));
        mqttConnectOptions.setServerURIs(StringUtils.split(mqMsgProperties.getProperty("mqtt.url"), ","));
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setWill("willTopic", WILL_DATA, 1, false);
        String property = mqMsgProperties.getProperty("mqtt.maxInflight");
        mqttConnectOptions.setMaxInflight(StringUtils.isBlank(property) ? 1000 : Integer.parseInt(property));
        return mqttConnectOptions;
    }

    protected ThreadPoolTaskExecutor builder(int i, int i2, int i3, int i4, String str) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(i);
        threadPoolTaskExecutor.setMaxPoolSize(i2);
        threadPoolTaskExecutor.setQueueCapacity(i3);
        threadPoolTaskExecutor.setKeepAliveSeconds(i4);
        threadPoolTaskExecutor.setThreadNamePrefix(str);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    public void stop() {
        this.multiMqttMessageHandler.stop();
        this.replyProducerDrivenChannelAdapter.stop();
        this.channelConsumerDrivenChannelAdapter.stop();
    }
}
