package tech.mystox.framework.mqtt.service.impl;

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import tech.mystox.framework.common.util.CollectionUtils;
import tech.mystox.framework.common.util.MqttUtils;
import tech.mystox.framework.config.IaConf;
import tech.mystox.framework.core.IaENV;
import tech.mystox.framework.entity.MsgResult;
import tech.mystox.framework.entity.OperaContext;
import tech.mystox.framework.entity.RegisterMsg;
import tech.mystox.framework.entity.RegisterType;
import tech.mystox.framework.entity.ServerMsg;
import tech.mystox.framework.entity.ServerStatus;
import tech.mystox.framework.entity.StateCode;
import tech.mystox.framework.exception.MsgResultFailException;
import tech.mystox.framework.exception.RegisterException;
import tech.mystox.framework.scheduler.LoadBalanceScheduler;
import tech.mystox.framework.scheduler.RegScheduler;
import tech.mystox.framework.service.MsgHandler;

/* loaded from: input_file:tech/mystox/framework/mqtt/service/impl/MqttHandler.class */
public class MqttHandler implements MsgHandler {
    private IaENV iaENV;
    Logger logger = LoggerFactory.getLogger(MqttHandler.class);
    protected ChannelHandlerAck mqttHandlerAck;
    protected ChannelHandlerSub mqttHandlerImpl;
    protected ChannelSenderImpl mqttSenderImpl;
    private ApplicationContext applicationContext;

    public MqttHandler(IaENV iaENV, ApplicationContext applicationContext) {
        this.iaENV = iaENV;
        this.applicationContext = applicationContext;
        this.mqttHandlerAck = (ChannelHandlerAck) applicationContext.getBean(ChannelHandlerAck.class);
        this.mqttHandlerImpl = (ChannelHandlerSub) applicationContext.getBean(ChannelHandlerSub.class);
        this.mqttSenderImpl = (ChannelSenderImpl) applicationContext.getBean(ChannelSenderImpl.class);
    }

    public MqttHandler(IaENV iaENV) {
        this.iaENV = iaENV;
    }

    public void addSubTopic(String str, int i) {
        this.mqttHandlerImpl.addSubTopic(str, i);
    }

    public void removeSubTopic(String... strArr) {
        this.mqttHandlerImpl.removeSubTopic(strArr);
    }

    public void removeAckSubTopic(String... strArr) {
        this.mqttHandlerAck.removeSubTopic(strArr);
    }

    public boolean isAckExists(String str) {
        return this.mqttHandlerAck.isExists(str);
    }

    public boolean isExists(String str) {
        return this.mqttHandlerImpl.isExists(str);
    }

    public void addAckTopic(String str, int i) {
        this.mqttHandlerAck.addSubTopic(str, i);
    }

    public MsgResult opera(String str, String str2) {
        return opera(str, str2, 1, 0L, null, false, false);
    }

    public RegisterMsg whereIsCentre() {
        IaConf conf = this.iaENV.getConf();
        String serverName = conf.getServerName();
        conf.getGroupCode();
        conf.getRegisterServerName();
        conf.getRegisterServerVersion();
        String registerUrl = conf.getRegisterUrl();
        RegisterMsg registerMsg = new RegisterMsg();
        this.logger.info("{} registerUrl is: [{}]", serverName, registerUrl);
        String[] split = registerUrl.split("://");
        String str = split[0];
        registerMsg.setRegisterUrl(split[1]);
        registerMsg.setRegisterUrlHeader(str);
        if (StringUtils.equals(RegisterType.ZOOKEEPER.toString(), str.toUpperCase())) {
            registerMsg.setRegisterType(RegisterType.ZOOKEEPER);
        }
        if (StringUtils.equals(RegisterType.REDIS.toString(), str.toUpperCase())) {
            registerMsg.setRegisterType(RegisterType.REDIS);
        }
        return registerMsg;
    }

    public void sendToMqtt(String str, String str2, String str3) throws Exception {
        if (!ServerStatus.ONLINE.equals(this.iaENV.getServerStatus())) {
            throw new MsgResultFailException(StateCode.StateCodeEnum.UNREGISTERED, "Server status is not online!");
        }
        this.mqttSenderImpl.sendToMqtt(str, str2, str3);
    }

    public void sendToMqtt(String str, String str2, int i, String str3) throws Exception {
        if (!ServerStatus.ONLINE.equals(this.iaENV.getServerStatus())) {
            throw new MsgResultFailException(StateCode.StateCodeEnum.UNREGISTERED, "Server status is not online!");
        }
        this.mqttSenderImpl.sendToMqtt(str, str2, i, str3);
    }

    public MsgResult sendToMqttSync(String str, String str2, String str3) {
        if (ServerStatus.ONLINE.equals(this.iaENV.getServerStatus())) {
            return this.mqttSenderImpl.sendToMqttSync(str, str2, str3);
        }
        throw new MsgResultFailException(StateCode.StateCodeEnum.UNREGISTERED, "Server status is not online!");
    }

    public MsgResult sendToMqttSync(String str, String str2, int i, String str3, long j, TimeUnit timeUnit) {
        return opera(new OperaContext(str2, JSONObject.toJSONString(Collections.singletonList(str3), new JSONWriter.Feature[0]), i, j, timeUnit, true, false));
    }

    public boolean sendToMqttBoolean(String str, String str2, int i, String str3) {
        return false;
    }

    public MsgResult opera(OperaContext operaContext) {
        if (!ServerStatus.ONLINE.equals(this.iaENV.getServerStatus())) {
            throw new MsgResultFailException(StateCode.StateCodeEnum.UNREGISTERED, "Server status is not online!");
        }
        String operaCode = operaContext.getOperaCode();
        LoadBalanceScheduler loadBalanceScheduler = this.iaENV.getLoadBalanceScheduler();
        try {
            if (operaContext.isAsync() && CollectionUtils.isEmpty(loadBalanceScheduler.getOperaRouteArr(operaCode))) {
                this.logger.warn("OperaCode[{}] route topic list size is null...", operaCode);
                return new MsgResult(15, "[" + operaCode + "] route topic list size is null...");
            }
            ServerMsg chooseServer = loadBalanceScheduler.chooseServer(operaCode);
            if (chooseServer == null) {
                this.logger.error("[{}] Choose server is null error...", operaCode);
                return new MsgResult(15, "[" + operaCode + "] Choose server is null error...");
            }
            return (MsgResult) loadBalanceScheduler.operaCall((str, str2) -> {
                return operaTarget(str, operaContext.getMsg(), operaContext.getQos(), operaContext.getTimeout(), operaContext.getTimeUnit(), operaContext.isSetFlag(), operaContext.isAsync(), str2);
            }, MqttUtils.preconditionGroupServerCode(chooseServer.getGroupCode(), MqttUtils.preconditionServerCode(chooseServer.getServerName(), chooseServer.getServerVersion(), chooseServer.getSequence())), operaCode);
        } catch (RegisterException e) {
            throw new MsgResultFailException(StateCode.StateCodeEnum.UNREGISTERED, "Choose server error..." + e);
        }
    }

    private MsgResult opera(String str, String str2, int i, long j, TimeUnit timeUnit, boolean z, boolean z2) {
        return opera(new OperaContext(str, str2, i, j, timeUnit, z, z2));
    }

    protected MsgResult operaTarget(String str, String str2, int i, long j, TimeUnit timeUnit, boolean z, boolean z2, String str3) {
        return z2 ? this.mqttSenderImpl.sendToMqttBoolean(str3, str, i, str2) ? new MsgResult(1, StateCode.StateCodeEnum.toStateCodeName(1)) : new MsgResult(0, StateCode.StateCodeEnum.toStateCodeName(0)) : z ? this.mqttSenderImpl.sendToMqttSync(str3, str, i, str2, j, timeUnit) : this.mqttSenderImpl.sendToMqttSync(str3, str, str2);
    }

    @Async
    public void broadcast(String str, String str2) {
        broadcast(str, str2, 0, false);
    }

    private void broadcast(String str, String str2, int i, boolean z) {
        RegScheduler regScheduler = this.iaENV.getRegScheduler();
        IaConf conf = this.iaENV.getConf();
        String preconditionRoutePath = MqttUtils.preconditionRoutePath(MqttUtils.preconditionGroupServerCode(conf.getGroupCode(), MqttUtils.preconditionServerCode(conf.getServerName(), conf.getServerVersion())), str);
        try {
            if (!regScheduler.exists(preconditionRoutePath)) {
                regScheduler.create(preconditionRoutePath, (byte[]) null, 1);
            }
            List operaRouteArr = this.iaENV.getLoadBalanceScheduler().getOperaRouteArr(str);
            if (CollectionUtils.isEmpty(operaRouteArr)) {
                this.logger.debug("Broadcast operaCode:[{}] route array is empty", str);
            } else {
                operaRouteArr.forEach(str3 -> {
                    try {
                        if (z) {
                            this.mqttSenderImpl.sendToMqtt(str3, str, i, str2);
                        } else {
                            this.mqttSenderImpl.sendToMqtt(str3, str, str2);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (Exception e) {
            if (this.logger.isDebugEnabled()) {
                e.printStackTrace();
            }
            this.logger.error("[{}] operaCode executor error [{}]", str, e.toString());
        }
    }

    public MsgResult opera(String str, String str2, int i, long j, TimeUnit timeUnit) {
        return opera(str, str2, i, j, timeUnit, true, false);
    }

    @Async
    public void operaAsync(String str, String str2) {
        opera(str, str2, 1, 0L, null, false, true);
    }
}
