package tech.mystox.framework.mqtt.config;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import tech.mystox.framework.config.YamlPropertySourceFactory;

@Configuration
@IntegrationComponentScan({"tech.mystox.framework"})
@ComponentScan({"tech.mystox.framework"})
@PropertySource(factory = YamlPropertySourceFactory.class, value = {"classpath:mqtt.yml"})
/* loaded from: input_file:tech/mystox/framework/mqtt/config/MqttConfig.class */
public class MqttConfig {
    private static final byte[] WILL_DATA = "offline".getBytes();

    @Value("${mqtt.username:root}")
    private String username;

    @Value("${mqtt.password:123456}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.producer.clientId}")
    private String producerClientId;

    @Value("${mqtt.producer.defaultTopic}")
    private String producerDefaultTopic;

    @Value("${mqtt.consumer.clientId}")
    private String consumerClientId;

    @Value("${mqtt.consumer.defaultTopic}")
    private String consumerDefaultTopic;

    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;

    @Value("${mqtt.maxInflight:100}")
    private int maxInflight;
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
    public static final String CHANNEL_REPLY = "mqttReplyBoundChannel";

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setUserName(this.username);
        mqttConnectOptions.setServerURIs(StringUtils.split(this.url, ","));
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setWill("willTopic", WILL_DATA, 1, false);
        mqttConnectOptions.setMaxInflight(this.maxInflight);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

    @Bean(name = {CHANNEL_NAME_OUT})
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean(name = {CHANNEL_REPLY})
    public MessageChannel mqttReplyChannel() {
        return new DirectChannel();
    }

    @Bean(name = {CHANNEL_NAME_IN})
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        return new MultiMqttMessageHandler(this::createMqttOutbound, 10);
    }

    public MessageHandler createMqttOutbound() {
        MyMqttPahoMessageHandler myMqttPahoMessageHandler = new MyMqttPahoMessageHandler(this.producerClientId + "_" + MqttAsyncClient.generateClientId(), mqttClientFactory());
        myMqttPahoMessageHandler.setAsync(true);
        myMqttPahoMessageHandler.setDefaultTopic(this.producerDefaultTopic);
        myMqttPahoMessageHandler.setDefaultQos(1);
        myMqttPahoMessageHandler.onInit();
        return myMqttPahoMessageHandler;
    }

    @Bean({"inbound"})
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(this.consumerClientId, mqttClientFactory(), StringUtils.split(this.consumerDefaultTopic, ","));
        mqttPahoMessageDrivenChannelAdapter.setCompletionTimeout(this.completionTimeout);
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(mqttInboundChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }

    @Bean({"replyProducer"})
    public MessageProducer replyProducer() {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(this.producerClientId + "_reply", mqttClientFactory(), StringUtils.split("topic_ack", ","));
        mqttPahoMessageDrivenChannelAdapter.setCompletionTimeout(this.completionTimeout);
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(mqttReplyChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }
}
