package org.joyqueue.service.impl;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.joyqueue.producer.ExtensionProducer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.async.BrokerClusterQuery;
import org.joyqueue.async.BrokerMonitorClusterQuery;
import org.joyqueue.async.RetrieveProvider;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.ServiceException;
import org.joyqueue.model.domain.Application;
import org.joyqueue.model.domain.ApplicationToken;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.Identity;
import org.joyqueue.model.domain.PartitionGroupReplica;
import org.joyqueue.model.domain.ProducerSendMessage;
import org.joyqueue.model.domain.SimplifiedBrokeMessage;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.model.domain.SubscribeType;
import org.joyqueue.model.exception.DataException;
import org.joyqueue.monitor.BrokerMessageInfo;
import org.joyqueue.monitor.RestResponse;
import org.joyqueue.monitor.RestResponseCode;
import org.joyqueue.nsr.AppTokenNameServerService;
import org.joyqueue.nsr.BrokerNameServerService;
import org.joyqueue.nsr.ReplicaServerService;
import org.joyqueue.other.HttpRestService;
import org.joyqueue.service.ApplicationService;
import org.joyqueue.service.ApplicationTokenService;
import org.joyqueue.service.BrokerMessageService;
import org.joyqueue.service.LeaderService;
import org.joyqueue.service.MessagePreviewService;
import org.joyqueue.util.NullUtil;
import org.joyqueue.util.UrlEncoderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("brokerMessageService")
/* loaded from: input_file:org/joyqueue/service/impl/BrokerMessageServiceImpl.class */
public class BrokerMessageServiceImpl implements BrokerMessageService {
    private static final long TIMEOUT = 10000;

    @Resource(type = BrokerMonitorClusterQuery.class)
    private BrokerClusterQuery<Subscribe> brokerClusterQuery;

    @Autowired
    private LeaderService leaderService;

    @Autowired(required = false)
    private HttpRestService httpRestService;

    @Autowired
    private MessagePreviewService messagePreviewService;

    @Autowired
    private ReplicaServerService replicaServerService;

    @Autowired
    private BrokerNameServerService brokerNameServerService;

    @Autowired
    private AppTokenNameServerService appTokenNameServerService;

    @Autowired
    private ApplicationTokenService applicationTokenService;

    @Autowired
    private ApplicationService applicationService;
    private static final Logger logger = LoggerFactory.getLogger(BrokerMessageServiceImpl.class);
    public static final ObjectMapper mapper = new ObjectMapper();

    @Override // org.joyqueue.service.BrokerMessageService
    public List<SimplifiedBrokeMessage> previewMessage(final Subscribe subscribe, String str, final int i) {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerClusterQuery.get(this.brokerClusterQuery.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMessageServiceImpl.1
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(broker);
                return broker.getIp() + ":" + broker.getMonitorPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str2, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[3];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                strArr[2] = String.valueOf(i);
                return String.format(str2, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "previewMessage", " preview pending or last message"), TIMEOUT, TimeUnit.MILLISECONDS);
        JavaType constructParametricType = mapper.getTypeFactory().constructParametricType(RestResponse.class, new JavaType[]{mapper.getTypeFactory().constructParametricType(List.class, new Class[]{BrokerMessageInfo.class})});
        try {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                RestResponse restResponse = (RestResponse) mapper.readValue(entry.getValue(), constructParametricType);
                if (restResponse.getCode() != RestResponseCode.SUCCESS.getCode()) {
                    logger.info(String.format("%s preview message request failed", entry.getKey()));
                } else if (NullUtil.isEmpty((Collection) restResponse.getData())) {
                    logger.info(String.format("%s preview message request success,but empty message", entry.getKey()));
                } else {
                    Iterator it = ((List) restResponse.getData()).iterator();
                    while (it.hasNext()) {
                        arrayList.add(simpleBrokerMessageConvert((BrokerMessageInfo) it.next(), str));
                    }
                }
            }
            return arrayList;
        } catch (Exception e) {
            logger.error("parse broker message error", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, "Message can't be parse");
        }
    }

    @Override // org.joyqueue.service.BrokerMessageService
    public List<SimplifiedBrokeMessage> previewNewestMessage(long j, String str, String str2, int i) {
        return null;
    }

    @Override // org.joyqueue.service.BrokerMessageService
    public List<BrokerMessageInfo> viewMessage(Subscribe subscribe, String str, String str2, String str3, int i) {
        Broker value = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), Integer.valueOf(str2).intValue()).getValue();
        RestResponse restResponse = this.httpRestService.get("getPartitionMessageByIndex", BrokerMessageInfo.class, true, value.getIp(), String.valueOf(value.getMonitorPort()), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName(), subscribe.getApp().getCode(), str2, str3, String.valueOf(i));
        if (restResponse == null || restResponse.getData() == null) {
            return null;
        }
        return decodeBrokerMessage((List) restResponse.getData(), str);
    }

    public List<BrokerMessageInfo> decodeBrokerMessage(List<BrokerMessageInfo> list, String str) {
        for (BrokerMessageInfo brokerMessageInfo : list) {
            if (brokerMessageInfo.getBody() != null) {
                compliantPreviewDecode(brokerMessageInfo, str);
            }
        }
        return list;
    }

    @Override // org.joyqueue.service.BrokerMessageService
    public Long getPartitionIndexByTime(Subscribe subscribe, String str, String str2) {
        Broker value = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), Integer.valueOf(str).intValue()).getValue();
        RestResponse restResponse = this.httpRestService.get("getTopicAppPartitionIndexByTime", Long.class, false, value.getIp(), String.valueOf(value.getMonitorPort()), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName(), subscribe.getApp().getCode(), str, str2);
        if (restResponse == null || restResponse.getData() == null) {
            return null;
        }
        return (Long) restResponse.getData();
    }

    @Override // org.joyqueue.service.BrokerMessageService
    public SimplifiedBrokeMessage download(String str, int i, String str2, String str3, short s, long j) {
        return null;
    }

    @Override // org.joyqueue.service.BrokerMessageService
    public void sendMessage(ProducerSendMessage producerSendMessage) {
        Application findByCode = this.applicationService.findByCode(producerSendMessage.getApp());
        if (findByCode == null) {
            throw new RuntimeException("application not exist");
        }
        TopicName parse = TopicName.parse(producerSendMessage.getTopic(), producerSendMessage.getNamespace());
        List<PartitionGroupReplica> findByTopicAndGroup = this.replicaServerService.findByTopicAndGroup(parse.getCode(), parse.getNamespace(), 0);
        if (CollectionUtils.isEmpty(findByTopicAndGroup)) {
            throw new RuntimeException("topic not exist");
        }
        try {
            Broker findById = this.brokerNameServerService.findById(Integer.valueOf(findByTopicAndGroup.get(0).getBrokerId()));
            if (findById == null) {
                throw new RuntimeException("broker not exist");
            }
            try {
                List<ApplicationToken> findByApp = this.appTokenNameServerService.findByApp(producerSendMessage.getApp());
                if (CollectionUtils.isEmpty(findByApp)) {
                    ApplicationToken applicationToken = new ApplicationToken();
                    applicationToken.setApplication(new Identity(Long.valueOf(findByCode.getId()), findByCode.getCode()));
                    try {
                        this.applicationTokenService.add(applicationToken);
                        findByApp = Arrays.asList(applicationToken);
                    } catch (Exception e) {
                        logger.error("add token exception, app: {}", producerSendMessage.getApp(), e);
                        throw new RuntimeException("token not exist");
                    }
                }
                String[] split = producerSendMessage.getMessage().split("\n");
                KeyValue newKeyValue = OMS.newKeyValue();
                newKeyValue.put("ACCOUNT_KEY", findByApp.get(0).getToken());
                newKeyValue.put("TRANSPORT_IO_THREADS", 1);
                ExtensionProducer createProducer = OMS.getMessagingAccessPoint(String.format("oms:joyqueue://%s@%s:%s/console", producerSendMessage.getApp(), findById.getIp(), Integer.valueOf(findById.getPort())), newKeyValue).createProducer();
                try {
                    createProducer.start();
                    for (String str : split) {
                        if (!StringUtils.isBlank(str)) {
                            createProducer.send(createProducer.createMessage(parse.getFullName(), str));
                        }
                    }
                } finally {
                    createProducer.stop();
                }
            } catch (Exception e2) {
                logger.error("find token exception, app: {}", producerSendMessage.getApp(), e2);
                throw new RuntimeException("topic not exist");
            }
        } catch (Exception e3) {
            logger.error("find broker exception, brokerId: {}", Integer.valueOf(findByTopicAndGroup.get(0).getBrokerId()), e3);
            throw new RuntimeException("topic not exist");
        }
    }

    public void compliantPreviewDecode(BrokerMessageInfo brokerMessageInfo, String str) {
        DataException serviceException;
        try {
            brokerMessageInfo.setBody(this.messagePreviewService.preview(str, Base64.getDecoder().decode(brokerMessageInfo.getBody())));
        } finally {
            try {
            } catch (Throwable th) {
            }
        }
    }

    private SimplifiedBrokeMessage simpleBrokerMessageConvert(BrokerMessageInfo brokerMessageInfo, String str) {
        SimplifiedBrokeMessage simplifiedBrokeMessage = new SimplifiedBrokeMessage();
        simplifiedBrokeMessage.setId(((int) brokerMessageInfo.getPartition()) + "-" + brokerMessageInfo.getMsgIndexNo());
        simplifiedBrokeMessage.setSendTime(brokerMessageInfo.getStartTime());
        simplifiedBrokeMessage.setStoreTime(brokerMessageInfo.getStoreTime());
        simplifiedBrokeMessage.setBusinessId(brokerMessageInfo.getBusinessId());
        if (brokerMessageInfo.getBody() != null) {
            compliantPreviewDecode(brokerMessageInfo, str);
            simplifiedBrokeMessage.setBody(brokerMessageInfo.getBody());
        }
        simplifiedBrokeMessage.setAttributes(brokerMessageInfo.getAttributes());
        simplifiedBrokeMessage.setFlag(brokerMessageInfo.isAck());
        return simplifiedBrokeMessage;
    }
}
