package org.joyqueue.service.impl;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.joyqueue.async.BrokerClusterQuery;
import org.joyqueue.async.BrokerMonitorClusterQuery;
import org.joyqueue.async.DefaultBrokerInfoFuture;
import org.joyqueue.async.RetrieveProvider;
import org.joyqueue.async.UpdateProvider;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.PartitionOffset;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.model.domain.TopicPartitionGroup;
import org.joyqueue.monitor.PartitionAckMonitorInfo;
import org.joyqueue.monitor.PartitionLeaderAckMonitorInfo;
import org.joyqueue.monitor.RestResponse;
import org.joyqueue.other.HttpRestService;
import org.joyqueue.service.BrokerMonitorService;
import org.joyqueue.service.BrokerRestUrlMappingService;
import org.joyqueue.service.ConsumeOffsetService;
import org.joyqueue.service.LeaderService;
import org.joyqueue.service.TopicPartitionGroupService;
import org.joyqueue.toolkit.time.SystemClock;
import org.joyqueue.util.AsyncHttpClient;
import org.joyqueue.util.JSONParser;
import org.joyqueue.util.NullUtil;
import org.joyqueue.util.UrlEncoderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("consumeOffsetService")
/* loaded from: input_file:org/joyqueue/service/impl/ConsumeOffsetServiceImpl.class */
public class ConsumeOffsetServiceImpl implements ConsumeOffsetService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeOffsetServiceImpl.class);
    private static final long TIMEOUT = 60000;

    @Resource(type = BrokerMonitorClusterQuery.class)
    private BrokerClusterQuery<Subscribe> brokerCluster;

    @Autowired
    private TopicPartitionGroupService partitionGroupService;

    @Autowired
    private BrokerRestUrlMappingService urlMappingService;

    @Autowired
    private LeaderService leaderService;

    @Autowired
    private TopicPartitionGroupService topicPartitionGroupService;

    @Autowired
    private BrokerMonitorService brokerMonitorService;

    @Autowired
    private HttpRestService httpRestService;

    @Override // org.joyqueue.service.ConsumeOffsetService
    public List<PartitionLeaderAckMonitorInfo> offsets(final Subscribe subscribe) {
        checkArgument(subscribe);
        final ArrayList arrayList = new ArrayList();
        new ArrayList();
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncQueryOnBroker(subscribe, new RetrieveProvider<Subscribe>() { // from class: org.joyqueue.service.impl.ConsumeOffsetServiceImpl.1
            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                return String.format(str, UrlEncoderUtil.encodeParam(CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName(), CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup())));
            }
        }, "appConsumeOffsetMonitor", "appConsumeOffsetMonitor"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList.size()) {
            logger.info("missing broker partition consume offset,ignore");
        }
        return tagLeaderPartitionOffset(map, subscribe);
    }

    public List<PartitionLeaderAckMonitorInfo> tagLeaderPartitionOffset(Map<String, String> map, Subscribe subscribe) {
        ArrayList arrayList = new ArrayList();
        Map<Short, Broker> findPartitionLeaderBrokerDetail = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            arrayList.addAll(tagLeaderPartitionOffset(entry.getKey(), (List) JSONParser.parse(entry.getValue(), RestResponse.class, PartitionAckMonitorInfo.class, true).getData(), findPartitionLeaderBrokerDetail));
        }
        Iterator<TopicPartitionGroup> it = this.topicPartitionGroupService.findByTopic(subscribe.getNamespace(), subscribe.getTopic()).iterator();
        while (it.hasNext()) {
            this.brokerMonitorService.findMonitorOnPartitionGroupDetailForTopicApp(subscribe, it.next().getGroupNo()).forEach(brokerMonitorRecord -> {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    PartitionLeaderAckMonitorInfo partitionLeaderAckMonitorInfo = (PartitionLeaderAckMonitorInfo) it2.next();
                    if (brokerMonitorRecord.getPartition() == partitionLeaderAckMonitorInfo.getPartition()) {
                        partitionLeaderAckMonitorInfo.setTps(brokerMonitorRecord.getDeQuence().getTps());
                        partitionLeaderAckMonitorInfo.setTraffic(brokerMonitorRecord.getDeQuence().getTraffic());
                        return;
                    }
                }
            });
        }
        return (List) arrayList.stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getPartition();
        })).collect(Collectors.toList());
    }

    public List<PartitionLeaderAckMonitorInfo> tagLeaderPartitionOffset(String str, List<PartitionAckMonitorInfo> list, Map<Short, Broker> map) {
        ArrayList arrayList = new ArrayList();
        for (PartitionAckMonitorInfo partitionAckMonitorInfo : list) {
            PartitionLeaderAckMonitorInfo partitionLeaderAckMonitorInfo = new PartitionLeaderAckMonitorInfo(partitionAckMonitorInfo, false);
            Broker broker = map.get(Short.valueOf(partitionAckMonitorInfo.getPartition()));
            if (!NullUtil.isEmpty(broker)) {
                if ((broker.getIp() + ":" + broker.getPort()).equals(str)) {
                    partitionLeaderAckMonitorInfo.setLeader(true);
                }
                arrayList.add(partitionLeaderAckMonitorInfo);
            }
        }
        return arrayList;
    }

    @Override // org.joyqueue.service.ConsumeOffsetService
    public long offset(Subscribe subscribe, short s) {
        checkArgument(subscribe);
        Map.Entry<PartitionGroup, Broker> findPartitionLeaderBrokerDetail = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), s);
        if (NullUtil.isEmpty(findPartitionLeaderBrokerDetail)) {
            throw new IllegalArgumentException("partition group or leader broker not found");
        }
        Broker value = findPartitionLeaderBrokerDetail.getValue();
        return ((Long) this.httpRestService.get("appPartitionOffsetMonitor", Long.class, false, value.getIp(), String.valueOf(value.getMonitorPort()), String.valueOf(findPartitionLeaderBrokerDetail.getKey().getGroup())).getData()).longValue();
    }

    @Override // org.joyqueue.service.ConsumeOffsetService
    public List<PartitionAckMonitorInfo> timeOffset(Subscribe subscribe, long j) {
        List<Broker> findLeaderBroker = this.leaderService.findLeaderBroker(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        if (NullUtil.isEmpty((Collection) findLeaderBroker)) {
            throw new IllegalArgumentException("partition group or leader broker not found");
        }
        ArrayList arrayList = new ArrayList();
        Map<Short, Broker> findPartitionLeaderBrokerDetail = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        for (Broker broker : findLeaderBroker) {
            RestResponse restResponse = this.httpRestService.get("getTopicAppOffset", PartitionAckMonitorInfo.class, true, broker.getIp(), String.valueOf(broker.getMonitorPort()), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName(), CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup()), String.valueOf(j));
            if (restResponse.getData() != null) {
                for (PartitionLeaderAckMonitorInfo partitionLeaderAckMonitorInfo : tagLeaderPartitionOffset(broker.getIp() + ":" + broker.getPort(), (List) restResponse.getData(), findPartitionLeaderBrokerDetail)) {
                    if (partitionLeaderAckMonitorInfo.isLeader()) {
                        arrayList.add(partitionLeaderAckMonitorInfo);
                    }
                }
            }
        }
        return arrayList;
    }

    @Override // org.joyqueue.service.ConsumeOffsetService
    public boolean resetOffset(Subscribe subscribe, short s, long j) {
        Map.Entry<PartitionGroup, Broker> findPartitionLeaderBrokerDetail = this.leaderService.findPartitionLeaderBrokerDetail(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), s);
        if (NullUtil.isEmpty(findPartitionLeaderBrokerDetail)) {
            throw new IllegalArgumentException("partition group or leader broker not found");
        }
        Broker value = findPartitionLeaderBrokerDetail.getValue();
        String[] strArr = {value.getIp(), String.valueOf(value.getMonitorPort()), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName(), CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup()), String.valueOf((int) s), String.valueOf(j)};
        return ((Boolean) this.httpRestService.put("resetAppPartitionOffset", Boolean.class, false, JSON.toJSONString(strArr), strArr).getData()).booleanValue();
    }

    @Override // org.joyqueue.service.ConsumeOffsetService
    public boolean resetOffset(Subscribe subscribe, final long j) {
        final ArrayList arrayList = new ArrayList();
        final String fullName = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
        final String convertApp = CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
        Map<String, String> map = this.brokerCluster.get(this.brokerCluster.asyncUpdateOnBroker(subscribe, new UpdateProvider<Subscribe>() { // from class: org.joyqueue.service.impl.ConsumeOffsetServiceImpl.2
            @Override // org.joyqueue.async.RetrieveProvider
            public String getPath(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                return String.format(str, UrlEncoderUtil.encodeParam(fullName, convertApp, String.valueOf(j)));
            }

            @Override // org.joyqueue.async.UpdateProvider
            public HttpUriRequest getRequest(String str, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                HttpPut httpPut = new HttpPut(str);
                try {
                    httpPut.setEntity(new StringEntity(String.valueOf((int) s)));
                } catch (UnsupportedEncodingException e) {
                    ConsumeOffsetServiceImpl.logger.info("unsupported", e);
                }
                return httpPut;
            }

            @Override // org.joyqueue.async.RetrieveProvider
            public String getKey(Broker broker, PartitionGroup partitionGroup, short s, Subscribe subscribe2) {
                arrayList.add(broker);
                return broker.getIp() + ":" + broker.getPort();
            }
        }, "resetAppTopicOffsetByTime", "reset consume topic Offset by time"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (map.size() != arrayList.size()) {
            return false;
        }
        Iterator<String> it = map.values().iterator();
        while (it.hasNext()) {
            RestResponse parse = JSONParser.parse(it.next(), RestResponse.class, Boolean.class, false);
            if (!((Boolean) parse.getData()).booleanValue()) {
                logger.info("reset by time failed,{}", parse.getMessage());
                return false;
            }
        }
        return true;
    }

    @Override // org.joyqueue.service.ConsumeOffsetService
    public boolean resetOffset(Subscribe subscribe, List<PartitionOffset> list) {
        checkArgument(subscribe);
        Map<Short, Map.Entry<PartitionGroup, Broker>> partitionBroker = partitionBroker(this.leaderService.findPartitionGroupLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode()));
        String pathTemplate = this.urlMappingService.pathTemplate("resetAppPartitionOffset");
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(list.size());
        String fullName = CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName();
        String convertApp = CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup());
        int i = 0;
        for (PartitionOffset partitionOffset : list) {
            Map.Entry<PartitionGroup, Broker> entry = partitionBroker.get(Short.valueOf(partitionOffset.getPartition()));
            if (NullUtil.isEmpty(entry)) {
                logger.info("partition group broker not found!");
            } else {
                String str = this.urlMappingService.monitorUrl(entry.getValue()) + String.format(pathTemplate, UrlEncoderUtil.encodeParam(fullName, convertApp, String.valueOf((int) partitionOffset.getPartition()), String.valueOf(partitionOffset.getOffset())));
                logger.info(String.format("start sync request,%s", str));
                HttpPut httpPut = new HttpPut(str);
                try {
                    httpPut.setEntity(new StringEntity(String.valueOf(partitionOffset.getOffset())));
                    AsyncHttpClient.AsyncRequest(httpPut, new AsyncHttpClient.ConcurrentHttpResponseHandler(str, SystemClock.now(), countDownLatch, String.valueOf((int) partitionOffset.getPartition()), concurrentHashMap));
                    i++;
                } catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
            }
        }
        this.brokerCluster.get(new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, "reset offset"), TIMEOUT, TimeUnit.MILLISECONDS);
        if (concurrentHashMap.size() != i) {
            return false;
        }
        Iterator it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            RestResponse parse = JSONParser.parse((String) it.next(), RestResponse.class, Boolean.class, false);
            if (!((Boolean) parse.getData()).booleanValue()) {
                logger.info("reset failed,{}", parse.getMessage());
                return false;
            }
        }
        return true;
    }

    private Map<Short, Map.Entry<PartitionGroup, Broker>> partitionBroker(List<Map.Entry<PartitionGroup, Broker>> list) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<PartitionGroup, Broker> entry : list) {
            Iterator it = entry.getKey().getPartitions().iterator();
            while (it.hasNext()) {
                hashMap.put((Short) it.next(), entry);
            }
        }
        return hashMap;
    }

    private void checkArgument(Subscribe subscribe) {
        Preconditions.checkArgument(subscribe != null, "topic field in subscribe arg can not be null.");
        Preconditions.checkArgument(subscribe.getTopic() != null, "topic field in subscribe arg can not be null.");
        Preconditions.checkArgument(subscribe.getApp() != null, "app field in subscribe arg can not be null.");
    }
}
