package org.joyqueue.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.async.BrokerClusterQuery;
import org.joyqueue.async.BrokerMonitorClusterQuery;
import org.joyqueue.async.RetrieveProvider;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.exception.ServiceException;
import org.joyqueue.manage.PartitionGroupMetric;
import org.joyqueue.manage.PartitionGroupPosition;
import org.joyqueue.manage.PartitionMetric;
import org.joyqueue.manage.PartitionPosition;
import org.joyqueue.model.BrokerMetadata;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.BrokerClient;
import org.joyqueue.model.domain.BrokerMonitorRecord;
import org.joyqueue.model.domain.ConnectionMonitorInfoWithIp;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.model.domain.SubscribeType;
import org.joyqueue.model.domain.TopicPartitionGroup;
import org.joyqueue.monitor.ArchiveMonitorInfo;
import org.joyqueue.monitor.Client;
import org.joyqueue.monitor.ConnectionMonitorDetailInfo;
import org.joyqueue.monitor.ConnectionMonitorInfo;
import org.joyqueue.monitor.ConsumerMonitorInfo;
import org.joyqueue.monitor.ConsumerPartitionGroupMonitorInfo;
import org.joyqueue.monitor.ConsumerPartitionMonitorInfo;
import org.joyqueue.monitor.DeQueueMonitorInfo;
import org.joyqueue.monitor.EnQueueMonitorInfo;
import org.joyqueue.monitor.PartitionGroupMonitorInfo;
import org.joyqueue.monitor.PendingMonitorInfo;
import org.joyqueue.monitor.ProducerMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionGroupMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionMonitorInfo;
import org.joyqueue.monitor.RestResponse;
import org.joyqueue.monitor.RetryMonitorInfo;
import org.joyqueue.other.HttpRestService;
import org.joyqueue.service.BrokerMonitorService;
import org.joyqueue.service.BrokerRestUrlMappingService;
import org.joyqueue.service.LeaderService;
import org.joyqueue.service.TopicPartitionGroupService;
import org.joyqueue.util.HttpUtil;
import org.joyqueue.util.JSONParser;
import org.joyqueue.util.NullUtil;
import org.joyqueue.util.UrlEncoderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("brokerMonitorService")
/* loaded from: input_file:org/joyqueue/service/impl/BrokerMonitorServiceImpl.class */
public class BrokerMonitorServiceImpl implements BrokerMonitorService {
    private Logger logger = LoggerFactory.getLogger(BrokerMonitorServiceImpl.class);
    private static final long TIMEOUT = 60000;

    @Resource(type = BrokerMonitorClusterQuery.class)
    private BrokerClusterQuery<Subscribe> brokerCluster;

    @Autowired
    private TopicPartitionGroupService partitionGroupService;

    @Autowired
    private LeaderService leaderService;

    @Autowired(required = false)
    private HttpRestService httpRestService;

    @Autowired
    private BrokerRestUrlMappingService urlMappingService;

    /* JADX WARN: Failed to find 'out' block for switch in B:14:0x00ef. Please report as an issue. */
    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnBroker(final Subscribe subscribe) {
        checkArgument(subscribe);
        ArrayList arrayList = new ArrayList();
        final ArrayList<Broker> arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.1
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[3];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                strArr[2] = subscribe.getType().name().toLowerCase();
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "appMonitor", "monitor on broker"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList2.size()) {
            this.logger.info("missing some of monitor on broker ,ignore!");
            return arrayList;
        }
        try {
            for (Broker broker : arrayList2) {
                String str = broker.getIp() + ":" + broker.getPort();
                String str2 = map.get(str);
                if (NullUtil.isEmpty(str2)) {
                    this.logger.info(String.format("ignore %s monitor on broker %s ", JSON.toJSON(subscribe), broker.getIp()));
                } else {
                    BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
                    brokerMonitorRecord.setIp(str);
                    switch (subscribe.getType().value()) {
                        case 1:
                            ProducerMonitorInfo producerMonitorInfo = (ProducerMonitorInfo) JSONParser.parse(str2, RestResponse.class, ProducerMonitorInfo.class, false).getData();
                            brokerMonitorRecord.setConnections(producerMonitorInfo.getConnections());
                            brokerMonitorRecord.setEnQuence(producerMonitorInfo.getEnQueue());
                            break;
                        case 2:
                            ConsumerMonitorInfo consumerMonitorInfo = (ConsumerMonitorInfo) JSONParser.parse(str2, RestResponse.class, ConsumerMonitorInfo.class, false).getData();
                            brokerMonitorRecord.setConnections(consumerMonitorInfo.getConnections());
                            brokerMonitorRecord.setRetry(consumerMonitorInfo.getRetry());
                            brokerMonitorRecord.setDeQuence(consumerMonitorInfo.getDeQueue());
                            brokerMonitorRecord.setPending(consumerMonitorInfo.getPending());
                            break;
                    }
                    arrayList.add(brokerMonitorRecord);
                }
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.info("broker asyncQueryOnBroker occurs parse exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public BrokerMonitorRecord find(Subscribe subscribe, boolean z) {
        if (!z) {
            return find(subscribe);
        }
        BrokerMonitorRecord merge = merge(subscribe, findMonitorOnPartitionGroupsForTopicApp(subscribe));
        if (!NullUtil.isEmpty(merge)) {
            BrokerMonitorRecord find = ((BrokerMonitorService) AopContext.currentProxy()).find(subscribe);
            if (!NullUtil.isEmpty(find)) {
                merge.setRetry(find.getRetry());
                merge.setConnections(find.getConnections());
            }
        }
        return merge;
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public BrokerMonitorRecord find(Subscribe subscribe) {
        checkArgument(subscribe);
        return merge(subscribe, findMonitorOnBroker(subscribe));
    }

    public BrokerMonitorRecord merge(Subscribe subscribe, List<BrokerMonitorRecord> list) {
        if (NullUtil.isEmpty((Collection) list)) {
            return null;
        }
        BrokerMonitorRecord brokerMonitorRecord = list.get(0);
        if (list.size() > 1) {
            for (int i = 1; i < list.size(); i++) {
                switch (subscribe.getType().value()) {
                    case 1:
                        brokerMonitorRecord.setRetry(add(brokerMonitorRecord.getRetry(), list.get(i).getRetry()));
                        brokerMonitorRecord.setEnQuence(add(brokerMonitorRecord.getEnQuence(), list.get(i).getEnQuence()));
                        break;
                    case 2:
                        brokerMonitorRecord.setDeQuence(add(brokerMonitorRecord.getDeQuence(), list.get(i).getDeQuence()));
                        break;
                }
                brokerMonitorRecord.setConnections(brokerMonitorRecord.getConnections() + list.get(i).getConnections());
                brokerMonitorRecord.setPending(add(brokerMonitorRecord.getPending(), list.get(i).getPending()));
            }
        }
        return brokerMonitorRecord;
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerClient> findClients(final Subscribe subscribe) {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.2
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[3];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                strArr[2] = subscribe.getType().name().toLowerCase();
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "appClientMonitor", "clients on broker"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList2.size()) {
            this.logger.info("missing some of clients on broker ,ignore!");
        }
        try {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.addAll(convert(entry.getKey(), ((ConnectionMonitorDetailInfo) JSONParser.parse(entry.getValue(), RestResponse.class, ConnectionMonitorDetailInfo.class, false).getData()).getClients()));
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.info(" parse connection info exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public List<BrokerClient> convert(String str, List<Client> list) {
        ArrayList arrayList = new ArrayList();
        for (Client client : list) {
            BrokerClient brokerClient = new BrokerClient();
            brokerClient.setIp(str);
            brokerClient.setClient(client);
            arrayList.add(brokerClient);
        }
        return arrayList;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:12:0x00d7. Please report as an issue. */
    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnPartition(final Subscribe subscribe) {
        checkArgument(subscribe);
        List<BrokerMonitorRecord> list = null;
        final ArrayList<Broker> arrayList = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.3
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[3];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                strArr[2] = subscribe.getType().name().toLowerCase();
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "appPartitionMonitor", "partitions on broker"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList.size()) {
            this.logger.info("ignore some broker partitions");
        }
        try {
            for (Broker broker : arrayList) {
                String str = broker.getIp() + ":" + broker.getPort();
                String str2 = map.get(str);
                if (!NullUtil.isEmpty(str2)) {
                    switch (subscribe.getType().value()) {
                        case 1:
                            list = transferProducerPartition((List) JSONParser.parse(str2, RestResponse.class, ProducerPartitionMonitorInfo.class, true).getData(), str);
                            break;
                        case 2:
                            list = transferConsumerPartition((List) JSONParser.parse(str2, RestResponse.class, ConsumerPartitionMonitorInfo.class, true).getData(), str);
                            break;
                    }
                } else {
                    this.logger.info(String.format("ignore %s partitions on broker %s ", JSON.toJSON(subscribe), broker.getIp()));
                }
            }
            if (!NullUtil.isEmpty((Collection) list)) {
                list.sort(Comparator.comparing(brokerMonitorRecord -> {
                    return Integer.valueOf(brokerMonitorRecord.getPartition());
                }));
            }
            return list;
        } catch (Exception e) {
            this.logger.info("broker asyncQueryOnBroker occurs parse exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnPartition(Subscribe subscribe, int i) {
        List<BrokerMonitorRecord> list = null;
        Map.Entry<PartitionGroup, Broker> findPartitionGroupLeaderBrokerDetail = this.leaderService.findPartitionGroupLeaderBrokerDetail(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), i);
        if (!NullUtil.isEmpty(findPartitionGroupLeaderBrokerDetail)) {
            Broker value = findPartitionGroupLeaderBrokerDetail.getValue();
            String[] strArr = new String[3];
            strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
            strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
            strArr[2] = subscribe.getType().name().toLowerCase();
            String[] encodeParam = UrlEncoderUtil.encodeParam(strArr);
            String[] strArr2 = new String[encodeParam.length + 2];
            strArr2[0] = value.getIp();
            strArr2[1] = String.valueOf(value.getMonitorPort());
            for (int i2 = 2; i2 < strArr2.length; i2++) {
                strArr2[i2] = encodeParam[i2 - 2];
            }
            String str = value.getIp() + ":" + value.getPort();
            switch (subscribe.getType().value()) {
                case 1:
                    list = transferProducerPartition((List) this.httpRestService.get("appPartitionMonitor", ProducerPartitionMonitorInfo.class, true, strArr2).getData(), str);
                    break;
                case 2:
                    list = transferConsumerPartition((List) this.httpRestService.get("appPartitionMonitor", ConsumerPartitionMonitorInfo.class, true, strArr2).getData(), str);
                    break;
            }
        }
        return list == null ? new ArrayList() : list;
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnPartitionGroupDetailForTopicApp(Subscribe subscribe, int i) {
        checkArgument(subscribe);
        List<BrokerMonitorRecord> findMonitorOnPartition = findMonitorOnPartition(subscribe, i);
        if (NullUtil.isEmpty((Collection) findMonitorOnPartition)) {
            return findMonitorOnPartition;
        }
        TopicPartitionGroup findByTopicAndGroup = this.partitionGroupService.findByTopicAndGroup(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), Integer.valueOf(i));
        HashSet hashSet = new HashSet();
        String partitions = findByTopicAndGroup.getPartitions();
        if (partitions != null && partitions.trim().length() != 0) {
            for (String str : JSON.parseArray(partitions, String.class)) {
                if (str.trim().length() != 0) {
                    hashSet.add(Integer.valueOf(str));
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (BrokerMonitorRecord brokerMonitorRecord : findMonitorOnPartition) {
            if (hashSet.contains(Integer.valueOf(brokerMonitorRecord.getPartition()))) {
                brokerMonitorRecord.setPartitionGroup(i);
                arrayList.add(brokerMonitorRecord);
            }
        }
        return arrayList;
    }

    public List<BrokerMonitorRecord> attachPartitionGroupInfo(Subscribe subscribe, List<BrokerMonitorRecord> list) {
        List<TopicPartitionGroup> findByTopic = this.partitionGroupService.findByTopic(subscribe.getNamespace(), subscribe.getTopic());
        HashMap hashMap = new HashMap();
        for (TopicPartitionGroup topicPartitionGroup : findByTopic) {
            Iterator it = topicPartitionGroup.getPartitionSet().iterator();
            while (it.hasNext()) {
                hashMap.put((Integer) it.next(), Integer.valueOf(topicPartitionGroup.getGroupNo()));
            }
        }
        for (BrokerMonitorRecord brokerMonitorRecord : list) {
            brokerMonitorRecord.setPartitionGroup(((Integer) hashMap.get(Integer.valueOf(brokerMonitorRecord.getPartition()))).intValue());
        }
        return list;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:12:0x00ff. Please report as an issue. */
    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnPartitionGroupsForTopicApp(final Subscribe subscribe) {
        ArrayList arrayList = new ArrayList();
        final ArrayList<Map.Entry> arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnPartitionGroup(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.4
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(new AbstractMap.SimpleEntry(partitionGroup, broker));
                return String.valueOf(partitionGroup.getGroup());
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[4];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                strArr[2] = subscribe.getType().name().toLowerCase();
                strArr[3] = String.valueOf(partitionGroup.getGroup());
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "appPartitionGroupsMonitor", "consumer pr producer partition groups"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList2.size()) {
            this.logger.info("missing some of partition group on broker ,ignore!");
        }
        try {
            for (Map.Entry entry : arrayList2) {
                String valueOf = String.valueOf(((PartitionGroup) entry.getKey()).getGroup());
                Broker broker = (Broker) entry.getValue();
                String str = map.get(valueOf);
                if (NullUtil.isEmpty(str)) {
                    this.logger.info(String.format("ignore %s partition group %s", JSON.toJSON(subscribe), valueOf));
                } else {
                    BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
                    brokerMonitorRecord.setIp(broker.getIp() + ":" + broker.getPort());
                    switch (subscribe.getType().value()) {
                        case 1:
                            ProducerPartitionGroupMonitorInfo producerPartitionGroupMonitorInfo = (ProducerPartitionGroupMonitorInfo) JSONParser.parse(str, RestResponse.class, ProducerPartitionGroupMonitorInfo.class, false).getData();
                            brokerMonitorRecord.setPartitionGroup(producerPartitionGroupMonitorInfo.getPartitionGroupId());
                            brokerMonitorRecord.setEnQuence(producerPartitionGroupMonitorInfo.getEnQueue());
                            break;
                        case 2:
                            ConsumerPartitionGroupMonitorInfo consumerPartitionGroupMonitorInfo = (ConsumerPartitionGroupMonitorInfo) JSONParser.parse(str, RestResponse.class, ConsumerPartitionGroupMonitorInfo.class, false).getData();
                            brokerMonitorRecord.setPartitionGroup(consumerPartitionGroupMonitorInfo.getPartitionGroupId());
                            brokerMonitorRecord.setDeQuence(consumerPartitionGroupMonitorInfo.getDeQueue());
                            brokerMonitorRecord.setPending(consumerPartitionGroupMonitorInfo.getPending());
                            break;
                    }
                    arrayList.add(brokerMonitorRecord);
                }
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.info("broker asyncQueryOnBroker occurs parse exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<BrokerMonitorRecord> findMonitorOnPartitionGroups(final Subscribe subscribe) {
        checkArgument(subscribe);
        ArrayList arrayList = new ArrayList();
        final ArrayList<Broker> arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.5
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[2];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "topicPartitionGroupsMonitor", "topic partition groups "), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList2.size()) {
            this.logger.info("missing some of partition group on broker ,ignore!");
        }
        try {
            for (Broker broker : arrayList2) {
                String str = broker.getIp() + ":" + broker.getPort();
                String str2 = map.get(str);
                if (NullUtil.isEmpty(str2)) {
                    this.logger.info(String.format("ignore %s partition group on broker %s", JSON.toJSON(subscribe), broker.getIp()));
                } else {
                    for (PartitionGroupMonitorInfo partitionGroupMonitorInfo : (List) JSONParser.parse(str2, RestResponse.class, PartitionGroupMonitorInfo.class, true).getData()) {
                        BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
                        brokerMonitorRecord.setPartitionGroup(partitionGroupMonitorInfo.getPartitionGroup());
                        brokerMonitorRecord.setIp(str);
                        brokerMonitorRecord.setEnQuence(partitionGroupMonitorInfo.getEnQueue());
                        brokerMonitorRecord.setDeQuence(partitionGroupMonitorInfo.getDeQueue());
                        arrayList.add(brokerMonitorRecord);
                    }
                }
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.info("broker asyncQueryOnBroker occurs parse exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<ConnectionMonitorInfoWithIp> findConnectionOnBroker(final Subscribe subscribe) {
        checkArgument(subscribe);
        ArrayList arrayList = new ArrayList();
        final ArrayList<Broker> arrayList2 = new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.BrokerMonitorServiceImpl.6
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList2.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                String[] strArr = new String[2];
                strArr[0] = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
                strArr[1] = subscribe.getType() == SubscribeType.PRODUCER ? subscribe.getApp().getCode() : CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
                return String.format(str, UrlEncoderUtil.encodeParam(strArr));
            }
        }, "appConnection", "app connection on broker"), TIMEOUT, TimeUnit.MILLISECONDS);
        try {
            for (Broker broker : arrayList2) {
                String str = broker.getIp() + ":" + broker.getPort();
                String str2 = map.get(str);
                if (NullUtil.isEmpty(str2)) {
                    this.logger.info(String.format("ignore %s broker %s connection", JSON.toJSON(subscribe), broker.getIp()));
                } else {
                    ConnectionMonitorInfo connectionMonitorInfo = (ConnectionMonitorInfo) JSONParser.parse(str2, RestResponse.class, ConnectionMonitorInfo.class, false).getData();
                    ConnectionMonitorInfoWithIp connectionMonitorInfoWithIp = new ConnectionMonitorInfoWithIp();
                    connectionMonitorInfoWithIp.setIp(str);
                    connectionMonitorInfoWithIp.setConsumer(connectionMonitorInfo.getConsumer());
                    connectionMonitorInfoWithIp.setProducer(connectionMonitorInfo.getProducer());
                    connectionMonitorInfoWithIp.setTotal(connectionMonitorInfo.getTotal());
                    arrayList.add(connectionMonitorInfoWithIp);
                }
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.info("broker asyncQueryOnBroker occurs parse exception.", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public List<PartitionGroupPosition> findPartitionGroupMetric(String str, String str2, Integer num) {
        TopicPartitionGroup findByTopicAndGroup = this.partitionGroupService.findByTopicAndGroup(str, str2, num);
        Future<Map<String, String>> future = null;
        try {
            future = this.brokerCluster.asyncQueryAllBroker(str, str2, num, "partitiongroupIndex", "partitiongroupIndex");
        } catch (Exception e) {
            this.logger.error("asynQuery,error", e);
        }
        Map<String, String> map = this.brokerCluster.get(future, TIMEOUT, TimeUnit.MILLISECONDS);
        HashMap hashMap = new HashMap();
        map.forEach((str3, str4) -> {
            hashMap.put(str3, JSONParser.parse(str4, RestResponse.class, PartitionGroupMetric.class, false).getData());
        });
        return getPartitionGroupInterval(String.valueOf(findByTopicAndGroup.getLeader()) + "_" + num, hashMap);
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public ArchiveMonitorInfo findArchiveState(String str, int i) {
        try {
            return (ArchiveMonitorInfo) JSONParser.parse(HttpUtil.get(String.format(this.urlMappingService.urlTemplate("archiveMonitor"), str, String.valueOf(i))), RestResponse.class, ArchiveMonitorInfo.class, false).getData();
        } catch (Exception e) {
            this.logger.error("archive monitor", e);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.service.BrokerMonitorService
    public BrokerMetadata findBrokerMetadata(Broker broker, String str, int i) {
        String ip = broker.getIp();
        int monitorPort = broker.getMonitorPort();
        Integer num = new Integer((int) broker.getId());
        Preconditions.checkArgument(StringUtils.isNotEmpty(ip) && monitorPort > 0 && num.intValue() > 0 && StringUtils.isNotEmpty(str), "query broker metadata params incorrect.");
        try {
            RestResponse parse = JSONParser.parse(HttpUtil.get(String.format(this.urlMappingService.urlTemplate("topicPartitionGroupMetadata"), ip, String.valueOf(monitorPort), str)), RestResponse.class, JSONObject.class, false);
            if (parse.getCode() != 200) {
                String format = String.format("query broker %s metadata under topic %s error. response code %s", ip + ":" + monitorPort, str, Integer.valueOf(parse.getCode()));
                this.logger.error(format);
                throw new ServiceException(ServiceException.NOT_FOUND, format);
            }
            try {
                JSONObject jSONObject = (JSONObject) ((JSONObject) ((JSONObject) parse.getData()).get("partitionGroups")).get(Integer.valueOf(i));
                JSONObject jSONObject2 = (JSONObject) jSONObject.get("leaderBroker");
                JSONObject jSONObject3 = (JSONObject) ((JSONObject) jSONObject.get("brokers")).get(num);
                jSONObject3.put("leaderAddress", jSONObject2.get("address"));
                jSONObject3.put("leaderBrokerId", jSONObject2.get("id"));
                jSONObject3.put("leaderRetryType", jSONObject2.get("retryType"));
                jSONObject3.put("leaderPermission", jSONObject2.get("permission"));
                jSONObject3.put("leaderIp", jSONObject2.get("ip"));
                jSONObject3.put("leaderPort", jSONObject2.get("port"));
                return (BrokerMetadata) JSONObject.toJavaObject(jSONObject3, BrokerMetadata.class);
            } catch (Exception e) {
                String format2 = String.format("transform the response data of the broker %s metadata under topic %s error.", ip + ":" + monitorPort, str);
                this.logger.error(format2, e);
                throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, format2, e);
            }
        } catch (Exception e2) {
            this.logger.error("query broker metadata error.", e2);
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e2.getMessage());
        }
    }

    private List<PartitionGroupPosition> getPartitionGroupInterval(String str, Map<String, PartitionGroupMetric> map) {
        PartitionGroupMetric partitionGroupMetric = map.get(str);
        ArrayList arrayList = new ArrayList();
        map.forEach((str2, partitionGroupMetric2) -> {
            PartitionGroupPosition positionConvert = positionConvert(partitionGroupMetric, partitionGroupMetric2);
            if (str.equals(str2)) {
                positionConvert.setLeader(true);
            }
            positionConvert.setBrokerId(str2);
            arrayList.add(positionConvert);
            this.logger.info("getPartitionGroupInterval brokerid:{},partitionGroupPosition:{}", str2, JSON.toJSONString(positionConvert));
        });
        return arrayList;
    }

    private PartitionGroupPosition positionConvert(PartitionGroupMetric partitionGroupMetric, PartitionGroupMetric partitionGroupMetric2) {
        PartitionGroupPosition partitionGroupPosition = new PartitionGroupPosition();
        partitionGroupPosition.setRightPosition(partitionGroupMetric2.getRightPosition());
        partitionGroupPosition.setPartitionGroup(partitionGroupMetric2.getPartitionGroup());
        partitionGroupPosition.setRightPositionInterval(partitionGroupMetric.getRightPosition() - partitionGroupMetric2.getRightPosition());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < partitionGroupMetric.getPartitionMetrics().length; i++) {
            PartitionMetric partitionMetric = partitionGroupMetric.getPartitionMetrics()[i];
            PartitionMetric partitionMetric2 = partitionGroupMetric2.getPartitionMetrics()[i];
            PartitionPosition partitionPosition = new PartitionPosition();
            partitionPosition.setPartition(partitionMetric2.getPartition());
            partitionPosition.setRightPosition(partitionMetric2.getRightIndex());
            partitionPosition.setRightPositionInterval(partitionMetric.getRightIndex() - partitionMetric2.getRightIndex());
            arrayList.add(partitionPosition);
        }
        partitionGroupPosition.setPartitionPositionList(arrayList);
        return partitionGroupPosition;
    }

    public RetryMonitorInfo add(RetryMonitorInfo retryMonitorInfo, RetryMonitorInfo retryMonitorInfo2) {
        if (NullUtil.isEmpty(retryMonitorInfo) || NullUtil.isEmpty(retryMonitorInfo2)) {
            return NullUtil.isEmpty(retryMonitorInfo) ? retryMonitorInfo2 : retryMonitorInfo;
        }
        retryMonitorInfo.setSuccess(retryMonitorInfo.getSuccess() + retryMonitorInfo2.getSuccess());
        retryMonitorInfo.setCount(retryMonitorInfo.getCount() + retryMonitorInfo2.getCount());
        retryMonitorInfo.setFailure(retryMonitorInfo.getFailure() + retryMonitorInfo2.getFailure());
        return retryMonitorInfo;
    }

    public PendingMonitorInfo add(PendingMonitorInfo pendingMonitorInfo, PendingMonitorInfo pendingMonitorInfo2) {
        if (NullUtil.isEmpty(pendingMonitorInfo) || NullUtil.isEmpty(pendingMonitorInfo2)) {
            return NullUtil.isEmpty(pendingMonitorInfo) ? pendingMonitorInfo2 : pendingMonitorInfo;
        }
        pendingMonitorInfo.setCount(pendingMonitorInfo.getCount() + pendingMonitorInfo2.getCount());
        return pendingMonitorInfo;
    }

    public DeQueueMonitorInfo add(DeQueueMonitorInfo deQueueMonitorInfo, DeQueueMonitorInfo deQueueMonitorInfo2) {
        if (NullUtil.isEmpty(deQueueMonitorInfo) || NullUtil.isEmpty(deQueueMonitorInfo2)) {
            return NullUtil.isEmpty(deQueueMonitorInfo) ? deQueueMonitorInfo2 : deQueueMonitorInfo;
        }
        deQueueMonitorInfo.setCount(deQueueMonitorInfo.getCount() + deQueueMonitorInfo2.getCount());
        deQueueMonitorInfo.setMax(deQueueMonitorInfo.getMax() + deQueueMonitorInfo2.getMax());
        deQueueMonitorInfo.setSize(deQueueMonitorInfo.getSize() + deQueueMonitorInfo2.getSize());
        deQueueMonitorInfo.setOneMinuteRate(deQueueMonitorInfo.getOneMinuteRate() + deQueueMonitorInfo2.getOneMinuteRate());
        deQueueMonitorInfo.setTotalSize(deQueueMonitorInfo.getTotalSize() + deQueueMonitorInfo2.getTotalSize());
        deQueueMonitorInfo.setTp90(Math.max(deQueueMonitorInfo.getTp90(), deQueueMonitorInfo2.getTp90()));
        deQueueMonitorInfo.setTp99(Math.max(deQueueMonitorInfo.getTp99(), deQueueMonitorInfo2.getTp99()));
        return deQueueMonitorInfo;
    }

    public EnQueueMonitorInfo add(EnQueueMonitorInfo enQueueMonitorInfo, EnQueueMonitorInfo enQueueMonitorInfo2) {
        if (NullUtil.isEmpty(enQueueMonitorInfo) || NullUtil.isEmpty(enQueueMonitorInfo2)) {
            return NullUtil.isEmpty(enQueueMonitorInfo) ? enQueueMonitorInfo2 : enQueueMonitorInfo;
        }
        enQueueMonitorInfo.setCount(enQueueMonitorInfo.getCount() + enQueueMonitorInfo2.getCount());
        enQueueMonitorInfo.setMax(enQueueMonitorInfo.getMax() + enQueueMonitorInfo2.getMax());
        enQueueMonitorInfo.setSize(enQueueMonitorInfo.getSize() + enQueueMonitorInfo2.getSize());
        enQueueMonitorInfo.setOneMinuteRate(enQueueMonitorInfo.getOneMinuteRate() + enQueueMonitorInfo2.getOneMinuteRate());
        enQueueMonitorInfo.setTotalSize(enQueueMonitorInfo.getTotalSize() + enQueueMonitorInfo2.getTotalSize());
        enQueueMonitorInfo.setTp90(Math.max(enQueueMonitorInfo.getTp90(), enQueueMonitorInfo2.getTp90()));
        enQueueMonitorInfo.setTp99(Math.max(enQueueMonitorInfo.getTp99(), enQueueMonitorInfo2.getTp99()));
        return enQueueMonitorInfo;
    }

    public BrokerMonitorRecord add(BrokerMonitorRecord brokerMonitorRecord, BrokerMonitorRecord brokerMonitorRecord2) {
        if (NullUtil.isEmpty(brokerMonitorRecord) || NullUtil.isEmpty(brokerMonitorRecord2)) {
            return NullUtil.isEmpty(brokerMonitorRecord) ? brokerMonitorRecord2 : brokerMonitorRecord;
        }
        brokerMonitorRecord.setPending(add(brokerMonitorRecord.getPending(), brokerMonitorRecord2.getPending()));
        brokerMonitorRecord.setRetry(add(brokerMonitorRecord.getRetry(), brokerMonitorRecord2.getRetry()));
        brokerMonitorRecord.setEnQuence(add(brokerMonitorRecord.getEnQuence(), brokerMonitorRecord2.getEnQuence()));
        brokerMonitorRecord.setDeQuence(add(brokerMonitorRecord.getDeQuence(), brokerMonitorRecord2.getDeQuence()));
        return brokerMonitorRecord;
    }

    public List<BrokerMonitorRecord> transferConsumerPartition(List<ConsumerPartitionMonitorInfo> list, String str) {
        ArrayList arrayList = new ArrayList();
        if (NullUtil.isEmpty((Collection) list)) {
            return arrayList;
        }
        for (ConsumerPartitionMonitorInfo consumerPartitionMonitorInfo : list) {
            BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
            brokerMonitorRecord.setIp(str);
            brokerMonitorRecord.setPartition(consumerPartitionMonitorInfo.getPartition());
            brokerMonitorRecord.setDeQuence(consumerPartitionMonitorInfo.getDeQueue());
            brokerMonitorRecord.setPending(consumerPartitionMonitorInfo.getPending());
            arrayList.add(brokerMonitorRecord);
        }
        return arrayList;
    }

    public List<BrokerMonitorRecord> transferProducerPartition(List<ProducerPartitionMonitorInfo> list, String str) {
        ArrayList arrayList = new ArrayList();
        if (NullUtil.isEmpty((Collection) list)) {
            return arrayList;
        }
        for (ProducerPartitionMonitorInfo producerPartitionMonitorInfo : list) {
            BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
            brokerMonitorRecord.setPartition(producerPartitionMonitorInfo.getPartition());
            brokerMonitorRecord.setIp(str);
            brokerMonitorRecord.setEnQuence(producerPartitionMonitorInfo.getEnQueue());
            arrayList.add(brokerMonitorRecord);
        }
        return arrayList;
    }

    public BrokerMonitorRecord fillIncompleteBrokerMonitor() {
        BrokerMonitorRecord brokerMonitorRecord = new BrokerMonitorRecord();
        brokerMonitorRecord.setConnections(-1L);
        brokerMonitorRecord.setDeQuence(new DeQueueMonitorInfo());
        brokerMonitorRecord.getDeQuence().setCount(-1L);
        brokerMonitorRecord.setEnQuence(new EnQueueMonitorInfo());
        brokerMonitorRecord.getEnQuence().setCount(-1L);
        brokerMonitorRecord.setPending(new PendingMonitorInfo());
        brokerMonitorRecord.getPending().setCount(-1L);
        brokerMonitorRecord.setRetry(new RetryMonitorInfo());
        brokerMonitorRecord.getRetry().setCount(-1L);
        return brokerMonitorRecord;
    }

    private void checkArgument(Subscribe subscribe) {
        Preconditions.checkArgument(subscribe != null, "topic field in subscribe arg can not be null.");
        Preconditions.checkArgument(subscribe.getTopic() != null, "topic field in subscribe arg can not be null.");
        Preconditions.checkArgument(subscribe.getApp() != null, "app field in subscribe arg can not be null.");
        Preconditions.checkArgument(subscribe.getType() != null, "subscribeGroup field in subscribe arg can not be null.");
    }
}
