package org.joyqueue.api.Impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.joyqueue.api.OpenAPIService;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.exception.ServiceException;
import org.joyqueue.model.ListQuery;
import org.joyqueue.model.PageResult;
import org.joyqueue.model.Pagination;
import org.joyqueue.model.QPageQuery;
import org.joyqueue.model.domain.Application;
import org.joyqueue.model.domain.ApplicationToken;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.BrokerGroup;
import org.joyqueue.model.domain.BrokerMonitorRecord;
import org.joyqueue.model.domain.Consumer;
import org.joyqueue.model.domain.Identity;
import org.joyqueue.model.domain.Namespace;
import org.joyqueue.model.domain.PartitionOffset;
import org.joyqueue.model.domain.Producer;
import org.joyqueue.model.domain.SlimApplication;
import org.joyqueue.model.domain.SlimTopic;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.model.domain.SubscribeType;
import org.joyqueue.model.domain.Topic;
import org.joyqueue.model.domain.TopicPubSub;
import org.joyqueue.model.domain.User;
import org.joyqueue.model.exception.BusinessException;
import org.joyqueue.model.query.QBroker;
import org.joyqueue.model.query.QBrokerGroup;
import org.joyqueue.model.query.QTopic;
import org.joyqueue.monitor.PartitionAckMonitorInfo;
import org.joyqueue.monitor.PartitionLeaderAckMonitorInfo;
import org.joyqueue.monitor.PendingMonitorInfo;
import org.joyqueue.service.ApplicationService;
import org.joyqueue.service.ApplicationTokenService;
import org.joyqueue.service.ApplicationUserService;
import org.joyqueue.service.BrokerGroupService;
import org.joyqueue.service.BrokerMonitorService;
import org.joyqueue.service.BrokerService;
import org.joyqueue.service.ConsumeOffsetService;
import org.joyqueue.service.ConsumerService;
import org.joyqueue.service.LeaderService;
import org.joyqueue.service.ProducerService;
import org.joyqueue.service.TopicService;
import org.joyqueue.sync.ApplicationInfo;
import org.joyqueue.sync.SyncService;
import org.joyqueue.util.LocalSession;
import org.joyqueue.util.NullUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("openAPIService")
/* loaded from: input_file:org/joyqueue/api/Impl/OpenAPIServiceImpl.class */
public class OpenAPIServiceImpl implements OpenAPIService {

    @Autowired
    private TopicService topicService;

    @Autowired
    private ConsumerService consumerService;

    @Autowired
    private ProducerService producerService;

    @Autowired
    private SyncService syncService;

    @Autowired
    private ApplicationService applicationService;

    @Autowired
    private BrokerGroupService brokerGroupService;

    @Autowired
    private BrokerService brokerService;

    @Autowired
    private ConsumeOffsetService consumeOffsetService;

    @Autowired
    private BrokerMonitorService brokerMonitorService;

    @Autowired
    private LeaderService leaderService;

    @Autowired
    private ApplicationTokenService applicationTokenService;

    @Autowired
    private ApplicationUserService applicationUserService;
    private static final long MINUTES_MS = 60000;
    private final Logger logger = LoggerFactory.getLogger(OpenAPIServiceImpl.class);
    private Random random = new Random();

    @Override // org.joyqueue.api.OpenAPIService
    public PageResult<TopicPubSub> findTopicPubSubInfo(Pagination pagination) throws Exception {
        QPageQuery<QTopic> qPageQuery = new QPageQuery<>();
        qPageQuery.setQuery(new QTopic());
        qPageQuery.setPagination(pagination);
        PageResult<Topic> search = this.topicService.search(qPageQuery);
        List<Topic> result = search.getResult();
        ArrayList arrayList = new ArrayList(result.size());
        for (Topic topic : result) {
            try {
                arrayList.add(findTopicPubsub(topic));
            } catch (Exception e) {
                this.logger.error(String.format("Find Topic PubSub Info, topic:%s", topic.getName()), e);
            }
        }
        PageResult<TopicPubSub> pageResult = new PageResult<>();
        pageResult.setPagination(search.getPagination());
        pageResult.setResult(arrayList);
        return pageResult;
    }

    TopicPubSub findTopicPubsub(Topic topic) throws Exception {
        List<Consumer> findByTopic = this.consumerService.findByTopic(topic.getCode(), topic.getNamespace().getCode());
        List<Producer> findByTopic2 = this.producerService.findByTopic(topic.getNamespace().getCode(), topic.getCode());
        TopicPubSub topicPubSub = new TopicPubSub();
        ArrayList arrayList = new ArrayList();
        List<Broker> findLeaderBroker = this.leaderService.findLeaderBroker(topic.getCode(), topic.getNamespace().getCode());
        if (!NullUtil.isEmpty((Collection) findLeaderBroker)) {
            for (Broker broker : findLeaderBroker) {
                arrayList.add(broker.getIp() + ":" + broker.getPort());
            }
        }
        SlimTopic slimTopic = new SlimTopic();
        slimTopic.setIps(arrayList);
        slimTopic.setCode(topic.getCode());
        topicPubSub.setTopic(slimTopic);
        ArrayList arrayList2 = new ArrayList();
        Iterator<Consumer> it = findByTopic.iterator();
        while (it.hasNext()) {
            Identity app = it.next().getApp();
            if (!NullUtil.isEmpty(app)) {
                arrayList2.add(String.valueOf(app.getCode()));
            }
        }
        topicPubSub.setConsumers(appsToApplication(arrayList2));
        ArrayList arrayList3 = new ArrayList();
        Iterator<Producer> it2 = findByTopic2.iterator();
        while (it2.hasNext()) {
            Identity app2 = it2.next().getApp();
            if (!NullUtil.isEmpty(app2)) {
                arrayList3.add(String.valueOf(app2.getCode()));
            }
        }
        topicPubSub.setProducers(appsToApplication(arrayList3));
        return topicPubSub;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public TopicPubSub findTopicPubSubInfo(String str, String str2) throws Exception {
        Topic topic = new Topic();
        topic.setCode(str);
        topic.setNamespace(new Namespace());
        topic.getNamespace().setCode(str2);
        return findTopicPubsub(topic);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<Consumer> queryConsumerTopicByApp(String str) throws Exception {
        return this.consumerService.findByApp(str);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<Consumer> findConsumers(String str, String str2) throws Exception {
        return this.consumerService.findByTopic(str, str2);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<Producer> findProducers(String str, String str2) throws Exception {
        return this.producerService.findByTopic(str2, str);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public Producer publish(Producer producer) throws Exception {
        Topic findByCode = this.topicService.findByCode(producer.getNamespace().getCode(), producer.getTopic().getCode());
        Application findByCode2 = this.applicationService.findByCode(producer.getApp().getCode());
        if (NullUtil.isEmpty(findByCode) || NullUtil.isEmpty(findByCode2)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format("topic %s or app %s not exist!", producer.getTopic().getCode(), producer.getApp().getCode()));
        }
        producer.setTopic(findByCode);
        producer.setApp(new Identity(producer.getApp().getCode()));
        this.producerService.add(producer);
        return this.producerService.findByTopicAppGroup(producer.getNamespace().getCode(), producer.getTopic().getCode(), producer.getApp().getCode());
    }

    @Override // org.joyqueue.api.OpenAPIService
    public Consumer subscribe(Consumer consumer) throws Exception {
        Topic findByCode = this.topicService.findByCode(consumer.getNamespace().getCode(), consumer.getTopic().getCode());
        Application findByCode2 = this.applicationService.findByCode(consumer.getApp().getCode());
        if (NullUtil.isEmpty(findByCode) || NullUtil.isEmpty(findByCode2)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format("topic %s or app %s not exist!", consumer.getTopic().getCode(), consumer.getApp().getCode()));
        }
        consumer.setTopic(findByCode);
        consumer.setNamespace(consumer.getNamespace());
        consumer.setApp(consumer.getApp());
        this.consumerService.add(consumer);
        return this.consumerService.findByTopicAppGroup(consumer.getNamespace().getCode(), consumer.getTopic().getCode(), consumer.getApp().getCode(), consumer.getSubscribeGroup());
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean unPublish(Producer producer) throws Exception {
        List<Producer> findProducers = findProducers(producer.getTopic().getCode(), producer.getNamespace().getCode());
        List<Consumer> findConsumers = findConsumers(producer.getTopic().getCode(), producer.getNamespace().getCode());
        if (NullUtil.isEmpty((Collection) findProducers) || (findProducers.size() == 1 && findConsumers.size() > 0)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format("no subscribe or please unSubscribe all the consumers of topic %s before cancel publish", producer.getTopic().getCode()));
        }
        Producer findProducer = findProducer(findProducers, producer.getApp().getCode());
        if (NullUtil.isEmpty(findProducer)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format(" %s haven't publish to the topic %s ", producer.getApp().getCode(), CodeConverter.convertTopic(producer.getNamespace(), producer.getTopic()).getFullName()));
        }
        return this.producerService.delete(findProducer) > 0;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public Consumer uniqueSubscribe(Consumer consumer) throws Exception {
        String code = consumer.getNamespace() == null ? null : consumer.getNamespace().getCode();
        Topic findByCode = this.topicService.findByCode(code, consumer.getTopic().getCode());
        Application findByCode2 = this.applicationService.findByCode(consumer.getApp().getCode());
        if (NullUtil.isEmpty(findByCode) || NullUtil.isEmpty(findByCode2)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format("topic %s or app %s not exist!", consumer.getTopic().getCode(), consumer.getApp().getCode()));
        }
        User user = LocalSession.getSession().getUser();
        if (NullUtil.isEmpty(this.applicationUserService.findByUserApp(user.getCode(), consumer.getApp().getCode()))) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format("user %s app %s no permission!", user.getCode(), consumer.getApp().getCode()));
        }
        Consumer findByTopicAppGroup = this.consumerService.findByTopicAppGroup(code, consumer.getTopic().getCode(), consumer.getApp().getCode(), consumer.getSubscribeGroup());
        if (NullUtil.isNotEmpty(findByTopicAppGroup) && NullUtil.isNotEmpty(findByTopicAppGroup.getSubscribeGroup())) {
            return findByTopicAppGroup;
        }
        consumer.setSubscribeGroup(String.valueOf(this.random.nextInt(60000)));
        consumer.setTopic(findByCode);
        consumer.setNamespace(consumer.getNamespace());
        consumer.setApp(consumer.getApp());
        this.consumerService.add(consumer);
        return this.consumerService.findByTopicAppGroup(code, consumer.getTopic().getCode(), consumer.getApp().getCode(), consumer.getSubscribeGroup());
    }

    Producer findProducer(List<Producer> list, String str) {
        for (Producer producer : list) {
            if (producer.getApp().getCode().equals(str)) {
                return producer;
            }
        }
        return null;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean unSubscribe(Consumer consumer) throws Exception {
        Consumer findByTopicAppGroup = this.consumerService.findByTopicAppGroup(consumer.getNamespace().getCode(), consumer.getTopic().getCode(), consumer.getApp().getCode(), consumer.getSubscribeGroup());
        if (NullUtil.isEmpty(findByTopicAppGroup)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format(" %s haven't subscribe to the topic %s ", CodeConverter.convertApp(new Identity(consumer.getApp().getCode()), consumer.getSubscribeGroup()), CodeConverter.convertTopic(consumer.getNamespace(), consumer.getTopic()).getFullName()));
        }
        return this.consumerService.delete(findByTopicAppGroup) > 0;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public Application syncApplication(Application application) throws Exception {
        User user = LocalSession.getSession().getUser();
        application.setErp(user.getCode());
        ApplicationInfo syncApp = this.syncService.syncApp(application);
        if (NullUtil.isEmpty(syncApp) || NullUtil.isEmpty(user)) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "sync application failed or illegal erp " + application.getErp());
        }
        syncApp.setUser(new Identity(user));
        this.syncService.addOrUpdateApp(syncApp);
        return this.applicationService.findByCode(syncApp.getCode());
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean delApplication(Application application) throws Exception {
        if (NullUtil.isEmpty((Collection) this.consumerService.findByApp(application.getCode())) && NullUtil.isEmpty((Collection) this.producerService.findByApp(application.getCode()))) {
            return this.applicationService.delete(this.applicationService.findByCode(application.getCode())) > 0;
        }
        throw new ServiceException(ServiceException.BAD_REQUEST, "please unSubscribe/Publish  all  topics you have !");
    }

    @Override // org.joyqueue.api.OpenAPIService
    public Topic createTopic(Topic topic, QBrokerGroup qBrokerGroup, Identity identity) throws Exception {
        List<Broker> allocateBrokers = allocateBrokers(topic, qBrokerGroup);
        if (allocateBrokers.size() == 0) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "select broker is empty");
        }
        topic.setPartitions(topic.getPartitions() * allocateBrokers.size());
        topic.setBrokers(allocateBrokers);
        this.topicService.addWithBrokerGroup(topic, topic.getBrokerGroup(), topic.getBrokers(), identity);
        return this.topicService.findById(topic.getId());
    }

    @Override // org.joyqueue.api.OpenAPIService
    public void removeTopic(String str, String str2) throws Exception {
        Topic findByCode = this.topicService.findByCode(str, str2);
        if (findByCode == null) {
            throw new BusinessException("topic is not exist");
        }
        this.topicService.delete(findByCode);
    }

    List<Broker> allocateBrokers(Topic topic, QBrokerGroup qBrokerGroup) throws Exception {
        qBrokerGroup.setRole(0);
        List<BrokerGroup> findByQuery = this.brokerGroupService.findByQuery(new ListQuery(qBrokerGroup));
        if (findByQuery == null || findByQuery.size() == 0) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "broker group is empty");
        }
        BrokerGroup brokerGroup = findByQuery.get(0);
        topic.setBrokerGroup(brokerGroup);
        QBroker qBroker = new QBroker();
        qBroker.setGroup(new Identity(Long.valueOf(brokerGroup.getId()), brokerGroup.getCode()));
        List<Broker> queryBrokerList = this.brokerService.queryBrokerList(qBroker);
        if (queryBrokerList.size() == 0) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "select broker is empty");
        }
        if (topic.getBrokerNum() != 0 && topic.getBrokerNum() > queryBrokerList.size()) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "实际可用broker数量小于指定broker数量");
        }
        if (topic.getBrokerNum() == 0) {
            topic.setBrokerNum(3);
        }
        Random random = new Random();
        ArrayList arrayList = new ArrayList();
        int nextInt = random.nextInt(queryBrokerList.size());
        int brokerNum = nextInt + topic.getBrokerNum();
        for (int i = nextInt; i < brokerNum; i++) {
            Broker broker = queryBrokerList.get(i % queryBrokerList.size());
            if (!arrayList.contains(broker)) {
                arrayList.add(broker);
            }
        }
        return arrayList;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<PartitionAckMonitorInfo> findOffsets(Subscribe subscribe) {
        ArrayList arrayList = new ArrayList();
        subscribe.setType(SubscribeType.CONSUMER);
        isLegalSubscribe(subscribe);
        for (PartitionLeaderAckMonitorInfo partitionLeaderAckMonitorInfo : this.consumeOffsetService.offsets(subscribe)) {
            if (partitionLeaderAckMonitorInfo.isLeader()) {
                arrayList.add(partitionLeaderAckMonitorInfo);
            }
        }
        return arrayList;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean resetOffset(Subscribe subscribe, short s, long j) {
        subscribe.setType(SubscribeType.CONSUMER);
        isLegalSubscribe(subscribe);
        return this.consumeOffsetService.resetOffset(subscribe, s, j);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<PartitionAckMonitorInfo> timeOffset(Subscribe subscribe, long j) {
        return this.consumeOffsetService.timeOffset(subscribe, j);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean resetOffset(Subscribe subscribe, long j) {
        subscribe.setType(SubscribeType.CONSUMER);
        isLegalSubscribe(subscribe);
        return this.consumeOffsetService.resetOffset(subscribe, j);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public boolean resetOffset(Subscribe subscribe, List<PartitionOffset> list) {
        subscribe.setType(SubscribeType.CONSUMER);
        isLegalSubscribe(subscribe);
        return this.consumeOffsetService.resetOffset(subscribe, list);
    }

    @Override // org.joyqueue.api.OpenAPIService
    public PendingMonitorInfo pending(Subscribe subscribe) {
        subscribe.setType(SubscribeType.CONSUMER);
        isLegalSubscribe(subscribe);
        BrokerMonitorRecord find = this.brokerMonitorService.find(subscribe, true);
        if (NullUtil.isEmpty(find) || NullUtil.isEmpty(find.getPending())) {
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, "data not found");
        }
        return find.getPending();
    }

    @Override // org.joyqueue.api.OpenAPIService
    public int queryPartitionByTopic(String str, String str2) throws Exception {
        return this.topicService.findByCode(str, str2).getPartitions();
    }

    List<SlimApplication> appsToApplication(List<String> list) {
        if (NullUtil.isEmpty((Collection) list)) {
            return null;
        }
        List<Application> findByCodes = this.applicationService.findByCodes(list);
        ArrayList arrayList = new ArrayList();
        for (Application application : findByCodes) {
            SlimApplication slimApplication = new SlimApplication();
            slimApplication.setCode(application.getCode());
            slimApplication.setOwner(application.getOwner());
            slimApplication.setDepartment(application.getDepartment());
            arrayList.add(slimApplication);
        }
        return arrayList;
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<ApplicationToken> add(ApplicationToken applicationToken) {
        String code = applicationToken.getApplication().getCode();
        Application findByCode = this.applicationService.findByCode(code);
        if (NullUtil.isEmpty(findByCode) || findByCode.getStatus() == -1) {
            throw new ServiceException(ServiceException.BAD_REQUEST, "app not exist");
        }
        applicationToken.setApplication(new Identity(findByCode));
        try {
            this.applicationTokenService.add(applicationToken);
            return tokens(code);
        } catch (Exception e) {
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override // org.joyqueue.api.OpenAPIService
    public List<ApplicationToken> tokens(String str) {
        try {
            return this.applicationTokenService.findByApp(str);
        } catch (Exception e) {
            throw new ServiceException(ServiceException.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    private boolean isLegalSubscribe(Subscribe subscribe) {
        if (subscribe.getType() == SubscribeType.CONSUMER) {
            if (NullUtil.isEmpty(this.consumerService.findByTopicAppGroup(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), subscribe.getApp().getCode(), subscribe.getSubscribeGroup()))) {
                throw new ServiceException(ServiceException.BAD_REQUEST, String.format(" %s haven't subscribe the topic %s ", CodeConverter.convertApp(subscribe.getApp(), subscribe.getSubscribeGroup()), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName()));
            }
            return true;
        }
        if (NullUtil.isEmpty(this.producerService.findByTopicAppGroup(subscribe.getNamespace().getCode(), subscribe.getTopic().getCode(), subscribe.getApp().getCode()))) {
            throw new ServiceException(ServiceException.BAD_REQUEST, String.format(" %s haven't publish the topic %s ", subscribe.getApp().getCode(), CodeConverter.convertTopic(subscribe.getNamespace(), subscribe.getTopic()).getFullName()));
        }
        return true;
    }
}
