package org.joyqueue.async;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.service.BrokerRestUrlMappingService;
import org.joyqueue.service.BrokerService;
import org.joyqueue.service.LeaderService;
import org.joyqueue.service.PartitionGroupReplicaService;
import org.joyqueue.toolkit.time.SystemClock;
import org.joyqueue.util.AsyncHttpClient;
import org.joyqueue.util.NullUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("brokerClusterMonitorQuery")
/* loaded from: input_file:org/joyqueue/async/BrokerMonitorClusterQuery.class */
public class BrokerMonitorClusterQuery implements BrokerClusterQuery<Subscribe> {
    private Logger logger = LoggerFactory.getLogger(BrokerMonitorClusterQuery.class);

    @Autowired
    private LeaderService leaderService;

    @Autowired
    private BrokerRestUrlMappingService urlMappingService;

    @Autowired
    protected PartitionGroupReplicaService partitionGroupReplicaService;

    @Autowired
    private BrokerService brokerService;

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncQueryAllBroker(String str, String str2, Integer num, String str3, String str4) {
        List<Broker> list = (List) this.partitionGroupReplicaService.getByTopicAndGroup(str2, str, num.intValue()).stream().map(partitionGroupReplica -> {
            try {
                return this.brokerService.findById(Integer.valueOf(partitionGroupReplica.getBrokerId()));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
        if (NullUtil.isEmpty((Collection) list)) {
            throw new IllegalStateException("topic leader broker or rest path not found");
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(list.size());
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        String pathTemplate = this.urlMappingService.pathTemplate(str3);
        for (Broker broker : list) {
            String str5 = this.urlMappingService.monitorUrl(broker) + String.format(pathTemplate, new TopicName(str2, str).getFullName(), num);
            AsyncHttpClient.AsyncRequest(new HttpGet(str5), new AsyncHttpClient.ConcurrentHttpResponseHandler(str5, SystemClock.now(), countDownLatch, String.valueOf(broker.getId()) + "_" + num, concurrentHashMap));
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str4);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncQueryOnBroker(Subscribe subscribe, RetrieveProvider<Subscribe> retrieveProvider, String str, String str2) {
        List<Broker> findLeaderBroker = this.leaderService.findLeaderBroker(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        String pathTemplate = this.urlMappingService.pathTemplate(str);
        if (NullUtil.isEmpty((Collection) findLeaderBroker) || NullUtil.isEmpty(pathTemplate)) {
            throw new IllegalStateException("topic leader broker or rest path not found");
        }
        String path = retrieveProvider.getPath(pathTemplate, null, (short) -1, subscribe);
        CountDownLatch countDownLatch = new CountDownLatch(findLeaderBroker.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(findLeaderBroker.size());
        for (Broker broker : findLeaderBroker) {
            String str3 = this.urlMappingService.monitorUrl(broker) + path;
            this.logger.info(String.format("start sync request,%s", str3));
            AsyncHttpClient.AsyncRequest(new HttpGet(str3), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, retrieveProvider.getKey(broker, null, (short) -1, subscribe), concurrentHashMap));
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncDeleteOnBroker(Integer num, Subscribe subscribe, RetrieveProvider<Subscribe> retrieveProvider, String str, String str2) {
        try {
            Broker findById = this.brokerService.findById(num);
            String pathTemplate = this.urlMappingService.pathTemplate(str);
            if (NullUtil.isEmpty(findById) || NullUtil.isEmpty(pathTemplate)) {
                throw new IllegalStateException("this Broker not found");
            }
            String path = retrieveProvider.getPath(pathTemplate, null, (short) -1, subscribe);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(1);
            String str3 = this.urlMappingService.monitorUrl(findById) + path;
            this.logger.info(String.format("start sync request,%s", str3));
            AsyncHttpClient.AsyncRequest(new HttpDelete(str3), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, retrieveProvider.getKey(findById, null, (short) -1, subscribe), concurrentHashMap));
            return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncUpdateOnBroker(Subscribe subscribe, UpdateProvider<Subscribe> updateProvider, String str, String str2) {
        List<Broker> findLeaderBroker = this.leaderService.findLeaderBroker(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        String pathTemplate = this.urlMappingService.pathTemplate(str);
        if (NullUtil.isEmpty((Collection) findLeaderBroker) || NullUtil.isEmpty(pathTemplate)) {
            throw new IllegalStateException("topic leader broker or rest path not found");
        }
        String path = updateProvider.getPath(pathTemplate, null, (short) -1, subscribe);
        CountDownLatch countDownLatch = new CountDownLatch(findLeaderBroker.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(findLeaderBroker.size());
        for (Broker broker : findLeaderBroker) {
            String str3 = this.urlMappingService.monitorUrl(broker) + path;
            this.logger.info(String.format("start sync request,%s", str3));
            AsyncHttpClient.AsyncRequest(updateProvider.getRequest(str3, null, (short) -1, subscribe), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, updateProvider.getKey(broker, null, (short) -1, subscribe), concurrentHashMap));
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncUpdateOnPartitionGroup(Subscribe subscribe, UpdateProvider<Subscribe> updateProvider, String str, String str2) {
        List<Map.Entry<PartitionGroup, Broker>> findPartitionGroupLeaderBrokerDetail = this.leaderService.findPartitionGroupLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        String pathTemplate = this.urlMappingService.pathTemplate(str);
        if (NullUtil.isEmpty((Collection) findPartitionGroupLeaderBrokerDetail) || NullUtil.isEmpty(pathTemplate)) {
            throw new IllegalArgumentException("partition group leader broker or rest path not found");
        }
        CountDownLatch countDownLatch = new CountDownLatch(findPartitionGroupLeaderBrokerDetail.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(findPartitionGroupLeaderBrokerDetail.size() * 2);
        for (Map.Entry<PartitionGroup, Broker> entry : findPartitionGroupLeaderBrokerDetail) {
            String str3 = this.urlMappingService.monitorUrl(entry.getValue()) + updateProvider.getPath(pathTemplate, entry.getKey(), (short) -1, subscribe);
            this.logger.info(String.format("start sync request on partition group,%s", str3));
            AsyncHttpClient.AsyncRequest(updateProvider.getRequest(str3, entry.getKey(), (short) -1, subscribe), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, updateProvider.getKey(entry.getValue(), entry.getKey(), (short) -1, subscribe), concurrentHashMap));
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncUpdateOnPartition(Subscribe subscribe, UpdateProvider<Subscribe> updateProvider, String str, String str2) {
        List<Map.Entry<PartitionGroup, Broker>> findPartitionGroupLeaderBrokerDetail = this.leaderService.findPartitionGroupLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        String pathTemplate = this.urlMappingService.pathTemplate(str);
        if (NullUtil.isEmpty((Collection) findPartitionGroupLeaderBrokerDetail) || NullUtil.isEmpty(pathTemplate)) {
            throw new IllegalArgumentException("partition group leader broker or rest path not found");
        }
        CountDownLatch countDownLatch = new CountDownLatch(findPartitionGroupLeaderBrokerDetail.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(findPartitionGroupLeaderBrokerDetail.size() * 2);
        for (Map.Entry<PartitionGroup, Broker> entry : findPartitionGroupLeaderBrokerDetail) {
            for (Short sh : entry.getKey().getPartitions()) {
                String str3 = this.urlMappingService.monitorUrl(entry.getValue()) + updateProvider.getPath(pathTemplate, entry.getKey(), sh.shortValue(), subscribe);
                this.logger.info(String.format("start sync request on partition group,%s", str3));
                AsyncHttpClient.AsyncRequest(updateProvider.getRequest(str3, entry.getKey(), sh.shortValue(), subscribe), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, updateProvider.getKey(entry.getValue(), entry.getKey(), sh.shortValue(), subscribe), concurrentHashMap));
            }
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Future<Map<String, String>> asyncQueryOnPartitionGroup(Subscribe subscribe, RetrieveProvider<Subscribe> retrieveProvider, String str, String str2) {
        List<Map.Entry<PartitionGroup, Broker>> findPartitionGroupLeaderBrokerDetail = this.leaderService.findPartitionGroupLeaderBrokerDetail(subscribe.getTopic().getCode(), subscribe.getNamespace().getCode());
        String pathTemplate = this.urlMappingService.pathTemplate(str);
        if (NullUtil.isEmpty((Collection) findPartitionGroupLeaderBrokerDetail) || NullUtil.isEmpty(pathTemplate)) {
            throw new IllegalArgumentException("partition group leader broker or rest path not found");
        }
        CountDownLatch countDownLatch = new CountDownLatch(findPartitionGroupLeaderBrokerDetail.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(findPartitionGroupLeaderBrokerDetail.size() * 2);
        for (Map.Entry<PartitionGroup, Broker> entry : findPartitionGroupLeaderBrokerDetail) {
            String str3 = this.urlMappingService.monitorUrl(entry.getValue()) + retrieveProvider.getPath(pathTemplate, entry.getKey(), (short) -1, subscribe);
            this.logger.info(String.format("start sync request on partition group,%s", str3));
            AsyncHttpClient.AsyncRequest(new HttpGet(str3), new AsyncHttpClient.ConcurrentHttpResponseHandler(str3, SystemClock.now(), countDownLatch, retrieveProvider.getKey(entry.getValue(), entry.getKey(), (short) -1, subscribe), concurrentHashMap));
        }
        return new DefaultBrokerInfoFuture(countDownLatch, concurrentHashMap, str2);
    }

    @Override // org.joyqueue.async.BrokerClusterQuery
    public Map<String, String> get(Future<Map<String, String>> future, long j, TimeUnit timeUnit) {
        try {
            return future.get(j, timeUnit);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }
}
