package tech.mystox.framework.mqtt.service.impl;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import tech.mystox.framework.common.util.ByteUtil;
import tech.mystox.framework.common.util.MqttUtils;
import tech.mystox.framework.common.util.SpringContextUtil;
import tech.mystox.framework.core.IaContext;
import tech.mystox.framework.entity.MqttMsg;
import tech.mystox.framework.entity.MsgRsp;
import tech.mystox.framework.entity.RegisterSub;
import tech.mystox.framework.entity.StateCode;
import tech.mystox.framework.entity.UnitHead;
import tech.mystox.framework.mqtt.service.IMqttSender;

/* loaded from: input_file:tech/mystox/framework/mqtt/service/impl/MqttReceiver.class */
public class MqttReceiver {

    @Value("${mqtt.payload.limit:#{47 * 1024}}")
    private int mqttPayloadLimit;

    @Value("${jarResources.path:./jarResources}")
    private String jarPath;

    @Value("${mqtt.callback.maxCount:10000}")
    private long callbackMaxCount;

    @Value("${mqtt.package.timeout:30}")
    private long packageMsgTimeout;
    final IaContext iaContext;
    private final IMqttSender iMqttSender;
    private final ThreadPoolTaskExecutor mqttExecutor;
    private static final Logger logger = LoggerFactory.getLogger(MqttReceiver.class);
    protected static final Map<String, CallSubpackageMsg<MqttMsg>> CALLBACKS = new ConcurrentHashMap();

    public MqttReceiver(IaContext iaContext, IMqttSender iMqttSender, ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.iaContext = iaContext;
        this.iMqttSender = iMqttSender;
        this.mqttExecutor = threadPoolTaskExecutor;
        Properties mqMsgProperties = iaContext.getConf().getMqMsgProperties();
        if (mqMsgProperties != null) {
            this.mqttPayloadLimit = ((Integer) mqMsgProperties.getOrDefault("mqtt.payload.limit", 48128)).intValue();
            this.callbackMaxCount = ((Integer) mqMsgProperties.getOrDefault("mqtt.callback.maxCount", 10000)).intValue();
            this.packageMsgTimeout = ((Integer) mqMsgProperties.getOrDefault("mqtt.package.timeout", 30)).intValue();
            this.jarPath = mqMsgProperties.getProperty("jarResources.path", "./jarResources");
        }
    }

    public MsgRsp receive(String str, MqttMsg mqttMsg) {
        logger.debug("Receive... ..." + JSONObject.toJSONString(mqttMsg, new JSONWriter.Feature[0]));
        String unitBySubList = getUnitBySubList(str);
        MsgRsp msgRsp = null;
        try {
            if (unitBySubList.startsWith(UnitHead.LOCAL)) {
                msgRsp = localExecute(unitBySubList, mqttMsg);
            } else if (unitBySubList.startsWith(UnitHead.JAR)) {
                msgRsp = jarExecute(unitBySubList, mqttMsg);
            } else if (unitBySubList.startsWith(UnitHead.HTTP)) {
                logger.debug("Remote http execute");
            }
        } catch (Exception e) {
            logger.error(" [{}] Msg execute error: [{}]", mqttMsg.getMsgId(), e.toString());
            msgRsp = new MsgRsp(mqttMsg.getMsgId(), e.toString());
            msgRsp.setStateCode(0);
            e.printStackTrace();
        }
        logger.debug("[{}] Message execute result: [{}]", mqttMsg.getMsgId(), JSONObject.toJSONString(msgRsp, new JSONWriter.Feature[0]));
        return msgRsp;
    }

    MsgRsp localExecute(String str, MqttMsg mqttMsg) {
        String[] split = str.replace(UnitHead.LOCAL, "").split("/");
        String str2 = split[0];
        String str3 = split[1];
        String str4 = split.length > 2 ? split[2] : "[\"java.lang.String\"]";
        try {
            Class<?> cls = Class.forName(str2);
            Object bean = SpringContextUtil.getBean(cls);
            List parseArray = JSON.parseArray(str4, Class.class, new JSONReader.Feature[]{JSONReader.Feature.SupportClassForName});
            Method declaredMethod = cls.getDeclaredMethod(str3, (Class[]) parseArray.toArray(new Class[0]));
            Type[] genericParameterTypes = declaredMethod.getGenericParameterTypes();
            JSONArray parseArray2 = JSON.parseArray(mqttMsg.getPayload());
            Object[] objArr = new Object[parseArray.size()];
            for (int i = 0; i < objArr.length; i++) {
                objArr[i] = parseArray2.getObject(i, genericParameterTypes[i], new JSONReader.Feature[0]);
            }
            Object invoke = declaredMethod.invoke(bean, objArr);
            return new MsgRsp(mqttMsg.getMsgId(), invoke instanceof String ? (String) invoke : JSON.toJSONString(invoke));
        } catch (Exception e) {
            logger.error("[{}]Local execute exception! Source: [{}] Method name: [{}]", new Object[]{mqttMsg.getMsgId(), mqttMsg.getSourceAddress(), str3, e});
            MsgRsp msgRsp = new MsgRsp(mqttMsg.getMsgId(), e.toString());
            msgRsp.setStateCode(0);
            e.printStackTrace();
            return msgRsp;
        }
    }

    MsgRsp jarExecute(String str, MqttMsg mqttMsg) {
        String[] split = str.replace(UnitHead.JAR, "").split("/");
        String str2 = split[0];
        String str3 = split[1];
        String str4 = split[2];
        String str5 = split.length > 3 ? split[3] : "[\"java.lang.String\"]";
        try {
            URLClassLoader uRLClassLoader = new URLClassLoader(new URL[]{new File(this.jarPath + "/" + str2).toURI().toURL()});
            Thread.currentThread().setContextClassLoader(uRLClassLoader);
            Class<?> loadClass = uRLClassLoader.loadClass(str3);
            Object invoke = loadClass.getDeclaredMethod(str4, (Class[]) JSON.parseArray(str5, Class.class, new JSONReader.Feature[]{JSONReader.Feature.SupportClassForName}).toArray(new Class[0])).invoke(loadClass.newInstance(), JSON.parseArray(mqttMsg.getPayload()).toArray());
            String jSONString = invoke instanceof String ? (String) invoke : JSON.toJSONString(invoke);
            MsgRsp msgRsp = new MsgRsp(mqttMsg.getMsgId(), jSONString);
            logger.info("jar result: {}", jSONString);
            return msgRsp;
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException | MalformedURLException e) {
            e.printStackTrace();
            return null;
        }
    }

    public String getUnitBySubList(String str) {
        RegisterSub registerSub;
        String data = this.iaContext.getIaENV().getRegScheduler().getData(str);
        return (!StringUtils.isNotBlank(data) || (registerSub = (RegisterSub) JSONObject.parseObject(data, RegisterSub.class)) == null) ? "" : registerSub.getExecuteUnit();
    }

    public void messageReceiver(Message<String> message) {
        this.mqttExecutor.execute(() -> {
            Boolean bool = (Boolean) message.getHeaders().get("mqtt_duplicate");
            if (bool != null && bool.booleanValue()) {
                logger.warn("Message receive duplicate [{}]", message);
                return;
            }
            Object obj = message.getHeaders().get("mqtt_receivedTopic");
            if (obj == null) {
                logger.error("Message mqtt_receivedTopic is null [{}]", message);
                return;
            }
            String obj2 = obj.toString();
            MqttMsg mqttMsg = (MqttMsg) JSONObject.parseObject((String) message.getPayload(), MqttMsg.class);
            String str = MqttUtils.preconditionSubTopicId(mqttMsg.getSourceAddress(), mqttMsg.getOperaCode()) + "/ack";
            if (mqttMsg.isSubpackage()) {
                String str2 = null;
                try {
                    str2 = stickPackageMsg(mqttMsg);
                    mqttMsg.setPayload(str2);
                } catch (IllegalAccessException | InstantiationException | InterruptedException | NoSuchMethodException | InvocationTargetException | ExecutionException e) {
                    e.printStackTrace();
                    logger.error("[{}]Subpackage msg[{}] result excepted...[{}]", new Object[]{mqttMsg.getMsgId(), mqttMsg.getTopic(), e});
                    if (logger.isDebugEnabled()) {
                        e.printStackTrace();
                    }
                    MsgRsp msgRsp = new MsgRsp(mqttMsg.getMsgId(), e.toString());
                    msgRsp.setStateCode(Integer.valueOf(StateCode.StateCodeEnum.PACKAGE_ERROR.getCode()));
                    errorAck(str, msgRsp);
                } catch (TimeoutException e2) {
                    logger.error("[{}]Subpackage msg[{}] timeout[{}s]...[{}]", new Object[]{mqttMsg.getMsgId(), mqttMsg.getTopic(), Long.valueOf(this.packageMsgTimeout * 3), e2.toString()});
                    if (logger.isDebugEnabled()) {
                        e2.printStackTrace();
                    }
                    errorAck(str, new MsgRsp(mqttMsg.getMsgId(), e2.toString()));
                }
                if (StringUtils.isBlank(str2)) {
                    return;
                }
            }
            MsgRsp receive = receive(obj2, mqttMsg);
            if (mqttMsg.getHasAck().booleanValue()) {
                receive.setTopic(str);
                byte[] bytes = receive.getPayload().getBytes(StandardCharsets.UTF_8);
                try {
                    if (bytes.length > this.mqttPayloadLimit) {
                        for (MsgRsp msgRsp2 : subpackage(bytes, mqttMsg.getMsgId())) {
                            Thread.sleep(10L);
                            msgRsp2.setTopic(obj2);
                            this.iMqttSender.sendToMqtt(str, 1, JSONObject.toJSONString(msgRsp2, new JSONWriter.Feature[0]));
                        }
                    } else {
                        this.iMqttSender.sendToMqtt(str, 1, JSONObject.toJSONString(receive, new JSONWriter.Feature[0]));
                    }
                } catch (Exception e3) {
                    logger.error("[{}] Message ", receive.getMsgId(), e3);
                }
            }
        });
    }

    private void errorAck(String str, MsgRsp msgRsp) {
        msgRsp.setStateCode(Integer.valueOf(StateCode.StateCodeEnum.PACKAGE_ERROR.getCode()));
        try {
            this.iMqttSender.sendToMqtt(str, 1, JSONObject.toJSONString(msgRsp, new JSONWriter.Feature[0]));
        } catch (Exception e) {
            logger.error("[{}]Return errorAck[{}] message result exception[{}]", new Object[]{msgRsp.getMsgId(), str, e.toString()});
            if (logger.isDebugEnabled()) {
                e.printStackTrace();
            }
            e.printStackTrace();
        }
    }

    private String stickPackageMsg(MqttMsg mqttMsg) throws ExecutionException, InterruptedException, TimeoutException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        String msgId = mqttMsg.getMsgId();
        int size = CALLBACKS.size();
        mqttMsg.getOperaCode();
        if (size > this.callbackMaxCount) {
            logger.error("[{}]Message, system callback map is full[{}]", msgId, Integer.valueOf(size));
            return null;
        }
        CallSubpackageMsg<MqttMsg> callSubpackageMsg = new CallSubpackageMsg<>();
        CallSubpackageMsg<MqttMsg> putIfAbsent = CALLBACKS.putIfAbsent(msgId, callSubpackageMsg);
        if (putIfAbsent != null) {
            putIfAbsent.callbackSubPackage(mqttMsg);
            return null;
        }
        logger.debug("[{}]Message[{}] receive size package start...", msgId, mqttMsg.getTopic());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        FutureTask futureTask = new FutureTask(callSubpackageMsg);
        try {
            CALLBACKS.get(msgId).callbackSubPackage(mqttMsg);
            newSingleThreadExecutor.submit(futureTask);
            String payload = ((MqttMsg) futureTask.get(this.packageMsgTimeout * 3, TimeUnit.SECONDS)).getPayload();
            futureTask.cancel(true);
            newSingleThreadExecutor.shutdown();
            CALLBACKS.remove(msgId);
            return payload;
        } catch (Throwable th) {
            futureTask.cancel(true);
            newSingleThreadExecutor.shutdown();
            CALLBACKS.remove(msgId);
            throw th;
        }
    }

    private List<MsgRsp> subpackage(byte[] bArr, String str) {
        ArrayList arrayList = new ArrayList();
        int crc = ByteUtil.getCRC(bArr);
        int length = bArr.length;
        int i = (length / this.mqttPayloadLimit) + 1;
        logger.warn("[{}]Subpackage response msg length[{}] is large than mqtt payload limit[{}], count[{}]", new Object[]{str, Integer.valueOf(length), Integer.valueOf(this.mqttPayloadLimit), Integer.valueOf(i)});
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new MsgRsp(str, ArrayUtils.subarray(bArr, i2 * this.mqttPayloadLimit, this.mqttPayloadLimit * (i2 + 1)), true, Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(crc)));
        }
        return arrayList;
    }

    @PreDestroy
    public void destroy() {
        logger.debug("Mqtt receiver destroy...");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            int activeCount = this.mqttExecutor.getActiveCount();
            if (activeCount == 0) {
                newSingleThreadScheduledExecutor.shutdown();
            }
            if (activeCount >= 50) {
                logger.warn("Mqtt task executor status: pool size:[{}], active count:[{}], max pool size:[{}] ", new Object[]{Integer.valueOf(this.mqttExecutor.getPoolSize()), Integer.valueOf(activeCount), Integer.valueOf(this.mqttExecutor.getMaxPoolSize())});
            }
        }, 10L, 200L, TimeUnit.MILLISECONDS);
        try {
            if (newSingleThreadScheduledExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
                logger.info("Mqtt receiver destroy successfully!!");
            }
        } catch (InterruptedException e) {
            logger.error("Destroy mqtt receiver time out 10 seconds, Mqtt task executor status:pool size:[{}], active count:[{}], max pool size:[{}] ", new Object[]{Integer.valueOf(this.mqttExecutor.getPoolSize()), Integer.valueOf(this.mqttExecutor.getActiveCount()), Integer.valueOf(this.mqttExecutor.getMaxPoolSize())});
        }
    }

    public Map<String, CallSubpackageMsg<MqttMsg>> getCALLBACKS() {
        return CALLBACKS;
    }
}
