package com.geektcp.common.mq.service.producer;

import cn.hutool.json.JSONUtil;
import com.geektcp.common.mq.configuration.TopicConfig;
import com.geektcp.common.mq.constant.MessageType;
import com.geektcp.common.mq.model.MessageDTO;
import com.geektcp.common.mq.service.CommonProducer;
import com.geektcp.common.mq.service.KafkaProductCountService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/geektcp/common/mq/service/producer/CommonProducerImpl.class */
public class CommonProducerImpl implements CommonProducer {
    private static final Logger log = LoggerFactory.getLogger(CommonProducerImpl.class);
    private static final String SYNC = "SYNC";
    private static final String ASYNC = "SYNC";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private TopicConfig topicConfig;

    @Override // com.geektcp.common.mq.service.CommonProducer
    public Boolean syncSend(MessageDTO<?> messageDTO) {
        boolean validateParam = validateParam(messageDTO);
        if (!validateParam) {
            return Boolean.valueOf(validateParam);
        }
        String topicByType = getTopicByType(messageDTO.getType());
        boolean z = true;
        try {
            this.kafkaTemplate.send(topicByType, messageDTO.getData()).get();
        } catch (Exception e) {
            z = false;
        }
        if (z) {
            KafkaProductCountService.countConcurrentMap(topicByType);
        }
        return Boolean.valueOf(z);
    }

    @Override // com.geektcp.common.mq.service.CommonProducer
    public Boolean asyncSend(MessageDTO messageDTO) {
        boolean validateParam = validateParam(messageDTO);
        if (!validateParam) {
            return Boolean.valueOf(validateParam);
        }
        String topicByType = getTopicByType(messageDTO.getType());
        if (topicByType == null) {
            return false;
        }
        Object data = messageDTO.getData();
        boolean booleanValue = ((Boolean) this.kafkaTemplate.executeInTransaction(kafkaOperations -> {
            this.kafkaTemplate.send(topicByType, data);
            return true;
        })).booleanValue();
        if (booleanValue) {
            KafkaProductCountService.countConcurrentMap(topicByType);
        }
        return Boolean.valueOf(booleanValue);
    }

    @Override // com.geektcp.common.mq.service.CommonProducer
    public Boolean bulkSend(MessageDTO messageDTO) {
        return false;
    }

    private String getTopicByType(MessageType messageType) {
        if (MessageType.MSG_PAY.equals(messageType)) {
            return this.topicConfig.payNotifyTopic;
        }
        if (MessageType.MSG_USER.equals(messageType)) {
            return this.topicConfig.authInfoTopic;
        }
        if (MessageType.MSG_ETL.equals(messageType)) {
            return this.topicConfig.graphDataTopic;
        }
        if (MessageType.MSG_DIAGRAM.equals(messageType)) {
            return this.topicConfig.designDiagramTopic;
        }
        if (MessageType.MSG_OPERATE.equals(messageType)) {
            return this.topicConfig.logOperateTopic;
        }
        log.error("未知消息类型");
        return null;
    }

    private boolean validateParam(MessageDTO messageDTO) {
        if (messageDTO == null) {
            log.error("请求参数为空");
            return false;
        }
        if (messageDTO.getType() == null) {
            log.error("消息类型不能为空");
            return false;
        }
        if (messageDTO.getData() == null) {
            log.error("消息体不能为空");
            return false;
        }
        String key = messageDTO.getKey();
        Integer partition = messageDTO.getPartition();
        boolean z = StringUtils.isNotBlank(key) && partition != null;
        boolean z2 = StringUtils.isBlank(key) && partition == null;
        if (z || z2) {
            log.info("接收到埋点数据={}", JSONUtil.toJsonStr(messageDTO));
            return true;
        }
        log.error("分区和key必须同时存在，或同时不存在");
        return false;
    }
}
