package tech.mystox.framework.mqtt.service.impl;

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import tech.mystox.framework.common.util.ByteUtil;
import tech.mystox.framework.common.util.MqttUtils;
import tech.mystox.framework.config.IaConf;
import tech.mystox.framework.core.IaENV;
import tech.mystox.framework.core.MqttLogUtil;
import tech.mystox.framework.entity.MqttMsg;
import tech.mystox.framework.entity.MsgResult;
import tech.mystox.framework.entity.MsgRsp;
import tech.mystox.framework.entity.PayloadType;
import tech.mystox.framework.entity.StateCode;
import tech.mystox.framework.exception.RegisterException;
import tech.mystox.framework.mqtt.config.MqttConfig;
import tech.mystox.framework.mqtt.service.IMqttSender;
import tech.mystox.framework.scheduler.RegScheduler;

@Service("mqttSenderImpl")
/* loaded from: input_file:tech/mystox/framework/mqtt/service/impl/ChannelSenderImpl.class */
public class ChannelSenderImpl {
    Logger logger = LoggerFactory.getLogger(ChannelSenderImpl.class);

    @Value("${mqtt.payload.limit:#{47 * 1024}}")
    private int mqttPayloadLimit;
    protected static final Map<String, CallSubpackageMsg<MsgRsp>> CALLBACKS = new ConcurrentHashMap();

    @Value("${mqtt.callback.maxCount:10000}")
    private long callbackMaxCount;
    final IaENV iaEnv;
    final IaConf iaConf;
    private final IMqttSender mqttSender;
    private final MqttLogUtil mqttLogUtil;
    private final ThreadPoolTaskExecutor mqttSenderAckExecutor;

    public ChannelSenderImpl(IaENV iaENV, IaConf iaConf, IMqttSender iMqttSender, MqttLogUtil mqttLogUtil, ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.iaEnv = iaENV;
        this.iaConf = iaConf;
        this.mqttSender = iMqttSender;
        this.mqttLogUtil = mqttLogUtil;
        this.mqttSenderAckExecutor = threadPoolTaskExecutor;
    }

    public void sendToMqtt(String str, String str2, String str3) throws Exception {
        String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str, str2);
        String assembleMsgId = MqttUtils.assembleMsgId();
        List<MqttMsg> buildMqttMsg = buildMqttMsg(assembleMsgId, preconditionSubTopicId, str3, str2, false);
        if (!isExistsBySubList(str, str2)) {
            this.mqttLogUtil.ERROR(assembleMsgId, 2, str2, str);
            this.logger.error("[{}]message send error[{}] sub operaCode[{}.{}] is not exists...", new Object[]{assembleMsgId, 2, str, str2});
            return;
        }
        boolean z = false;
        for (MqttMsg mqttMsg : buildMqttMsg) {
            if (z) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String jSONString = JSONObject.toJSONString(mqttMsg, new JSONWriter.Feature[0]);
            this.logger.debug("[{}]message [{}] send...", assembleMsgId, jSONString);
            this.mqttSender.sendToMqtt(preconditionSubTopicId, jSONString);
            z = true;
        }
    }

    public void sendToMqtt(String str, String str2, int i, String str3) throws Exception {
        String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str, str2);
        String assembleMsgId = MqttUtils.assembleMsgId();
        List<MqttMsg> buildMqttMsg = buildMqttMsg(assembleMsgId, preconditionSubTopicId, str3, str2, false);
        if (!isExistsBySubList(str, str2)) {
            this.mqttLogUtil.ERROR(assembleMsgId, 2, str2, str);
            this.logger.error("[{}]message send error[{}] sub operaCode[{}.{}] is not exists...", new Object[]{assembleMsgId, 2, str, str2});
            return;
        }
        boolean z = false;
        for (MqttMsg mqttMsg : buildMqttMsg) {
            if (z) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String jSONString = JSONObject.toJSONString(mqttMsg, new JSONWriter.Feature[0]);
            this.logger.debug("[{}]message [{}] send...", assembleMsgId, jSONString);
            this.mqttSender.sendToMqtt(preconditionSubTopicId, i, jSONString);
            z = true;
        }
    }

    public boolean sendToMqttBoolean(String str, String str2, String str3, int i, List<MqttMsg> list) {
        try {
            if (!isExistsBySubList(str2, str3)) {
                this.mqttLogUtil.ERROR(str, 2, str3, str2);
                this.logger.error("[{}]message send error[{}] sub operaCode[{}.{}] is not exists...", new Object[]{str, 2, str2, str3});
                return false;
            }
            String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str2, str3);
            boolean z = false;
            for (MqttMsg mqttMsg : list) {
                if (z) {
                    Thread.sleep(10L);
                }
                String jSONString = JSONObject.toJSONString(mqttMsg, new JSONWriter.Feature[0]);
                this.logger.debug("[{}]message [{}] send...", str, jSONString);
                this.mqttSender.sendToMqtt(preconditionSubTopicId, i, jSONString);
                z = true;
            }
            return true;
        } catch (Exception e) {
            this.mqttLogUtil.ERROR(str, 13, str3, str2);
            this.logger.error("[{}]message send error[{}]...[{}]", new Object[]{str, 13, e.toString()});
            if (!this.logger.isDebugEnabled()) {
                return false;
            }
            e.printStackTrace();
            return false;
        }
    }

    public boolean sendToMqttBoolean(String str, String str2, int i, String str3) {
        String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str, str2);
        String assembleMsgId = MqttUtils.assembleMsgId();
        return sendToMqttBoolean(assembleMsgId, str, str2, i, buildMqttMsg(assembleMsgId, preconditionSubTopicId, str3, str2, false));
    }

    public MsgResult sendToMqttSync(String str, String str2, int i, String str3, long j, TimeUnit timeUnit) {
        String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str, str2);
        String assembleMsgId = MqttUtils.assembleMsgId();
        List<MqttMsg> buildMqttMsg = buildMqttMsg(assembleMsgId, preconditionSubTopicId, str3, str2, true);
        CallSubpackageMsg<MsgRsp> callSubpackageMsg = new CallSubpackageMsg<>();
        int size = CALLBACKS.size();
        if (size > this.callbackMaxCount) {
            this.mqttLogUtil.ERROR(assembleMsgId, 14, str2, str);
            this.logger.error("[{}]message, system callback map is full[{}]", assembleMsgId, Integer.valueOf(size));
            return new MsgResult(14, StateCode.StateCodeEnum.toStateCodeName(14));
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CALLBACKS.put(assembleMsgId, callSubpackageMsg);
        FutureTask futureTask = new FutureTask(callSubpackageMsg);
        try {
            try {
                if (!sendToMqttBoolean(assembleMsgId, str, str2, i, buildMqttMsg)) {
                    futureTask.cancel(true);
                    newSingleThreadExecutor.shutdown();
                    CALLBACKS.remove(assembleMsgId);
                    return new MsgResult(0, "request failed");
                }
                newSingleThreadExecutor.submit(futureTask);
                MsgRsp msgRsp = (MsgRsp) futureTask.get(j, timeUnit);
                MsgResult msgResult = new MsgResult(msgRsp.getStateCode().intValue(), msgRsp.getPayload());
                futureTask.cancel(true);
                newSingleThreadExecutor.shutdown();
                CALLBACKS.remove(assembleMsgId);
                return msgResult;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                this.mqttLogUtil.ERROR(assembleMsgId, 11, str2, str);
                this.logger.error("[{}]message{},{}, request timeout: [{}][{}]", new Object[]{assembleMsgId, str, str2, Long.valueOf(j), e.toString()});
                if (this.logger.isDebugEnabled()) {
                    e.printStackTrace();
                }
                MsgResult msgResult2 = new MsgResult(11, j + "|" + e.toString());
                futureTask.cancel(true);
                newSingleThreadExecutor.shutdown();
                CALLBACKS.remove(assembleMsgId);
                return msgResult2;
            } catch (Exception e2) {
                this.mqttLogUtil.ERROR(assembleMsgId, 0, str2, str);
                this.logger.error("[{}]message, request exception: [{}]", assembleMsgId, e2.toString());
                if (this.logger.isDebugEnabled()) {
                    e2.printStackTrace();
                }
                MsgResult msgResult3 = new MsgResult(0, e2.toString());
                futureTask.cancel(true);
                newSingleThreadExecutor.shutdown();
                CALLBACKS.remove(assembleMsgId);
                return msgResult3;
            }
        } catch (Throwable th) {
            futureTask.cancel(true);
            newSingleThreadExecutor.shutdown();
            CALLBACKS.remove(assembleMsgId);
            throw th;
        }
    }

    public MsgResult sendToMqttSync(String str, String str2, String str3) {
        return sendToMqttSync(str, str2, 2, str3, 30000L, TimeUnit.MILLISECONDS);
    }

    private boolean isExistsBySubList(String str, String str2) throws RegisterException {
        RegScheduler regScheduler = this.iaEnv.getRegScheduler();
        String preconditionSubTopicId = MqttUtils.preconditionSubTopicId(str, str2);
        if (regScheduler.exists(preconditionSubTopicId)) {
            return true;
        }
        this.logger.warn("topicId(nodePath) [{}] didn't registered...", preconditionSubTopicId);
        return false;
    }

    private List<MqttMsg> buildMqttMsg(String str, String str2, String str3, String str4, Boolean bool) {
        byte[] bytes = str3.getBytes(StandardCharsets.UTF_8);
        if (bytes.length > this.mqttPayloadLimit) {
            return subpackage(bytes, str2, str4, str, bool);
        }
        MqttMsg mqttMsg = new MqttMsg();
        mqttMsg.setMsgId(str);
        mqttMsg.setTopic(str2);
        mqttMsg.setPayloadType(PayloadType.STRING);
        mqttMsg.setOperaCode(str4);
        mqttMsg.setPayload(str3);
        mqttMsg.setHasAck(bool);
        mqttMsg.setSourceAddress(MqttUtils.preconditionGroupServerCode(this.iaConf.getGroupCode(), MqttUtils.preconditionServerCode(this.iaConf.getServerName(), this.iaConf.getServerVersion(), this.iaConf.getSequence())));
        return Collections.singletonList(mqttMsg);
    }

    private List<MqttMsg> subpackage(byte[] bArr, String str, String str2, String str3, Boolean bool) {
        ArrayList arrayList = new ArrayList();
        int crc = ByteUtil.getCRC(bArr);
        int length = bArr.length;
        int i = (length / this.mqttPayloadLimit) + 1;
        this.logger.warn("[{}]Subpackage mqtt msg length[{}] is large than mqtt payload limit[{}], count[{}]", new Object[]{str3, Integer.valueOf(length), Integer.valueOf(this.mqttPayloadLimit), Integer.valueOf(i)});
        for (int i2 = 0; i2 < i; i2++) {
            MqttMsg mqttMsg = new MqttMsg(str3, ArrayUtils.subarray(bArr, i2 * this.mqttPayloadLimit, this.mqttPayloadLimit * (i2 + 1)), true, Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(crc));
            mqttMsg.setMsgId(str3);
            mqttMsg.setTopic(str);
            mqttMsg.setPayloadType(PayloadType.STRING);
            mqttMsg.setOperaCode(str2);
            mqttMsg.setHasAck(bool);
            mqttMsg.setSourceAddress(MqttUtils.preconditionGroupServerCode(this.iaConf.getGroupCode(), MqttUtils.preconditionServerCode(this.iaConf.getServerName(), this.iaConf.getServerVersion(), this.iaConf.getSequence())));
            arrayList.add(mqttMsg);
        }
        return arrayList;
    }

    @ServiceActivator(inputChannel = MqttConfig.CHANNEL_REPLY)
    public void messageReceiver(Message<String> message) {
        this.mqttSenderAckExecutor.execute(() -> {
            try {
                String str = (String) message.getPayload();
                MsgRsp msgRsp = (MsgRsp) JSONObject.parseObject(str, MsgRsp.class);
                String msgId = msgRsp.getMsgId();
                this.logger.debug("[{}]message ack is [{}]", msgId, str);
                CallSubpackageMsg<MsgRsp> callSubpackageMsg = CALLBACKS.get(msgId);
                if (callSubpackageMsg == null) {
                    this.logger.warn("[{}]message[{}] ack find callback is null...", msgId, msgRsp.getTopic());
                } else if (msgRsp.isSubpackage()) {
                    callSubpackageMsg.callbackSubPackage(msgRsp);
                } else {
                    callSubpackageMsg.callback(msgRsp);
                }
            } catch (Exception e) {
                this.logger.warn("message ack receive error[{}] is invalidation...", e.toString());
                e.printStackTrace();
            }
        });
    }

    public Map<String, CallSubpackageMsg<MsgRsp>> getCALLBACKS() {
        return CALLBACKS;
    }
}
