package org.joyqueue.nsr.support;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.Topic;
import org.joyqueue.model.PageResult;
import org.joyqueue.model.QPageQuery;
import org.joyqueue.nsr.event.AddPartitionGroupEvent;
import org.joyqueue.nsr.event.AddTopicEvent;
import org.joyqueue.nsr.event.LeaderChangeEvent;
import org.joyqueue.nsr.event.RemovePartitionGroupEvent;
import org.joyqueue.nsr.event.RemoveTopicEvent;
import org.joyqueue.nsr.event.UpdatePartitionGroupEvent;
import org.joyqueue.nsr.event.UpdateTopicEvent;
import org.joyqueue.nsr.exception.NsrException;
import org.joyqueue.nsr.message.Messenger;
import org.joyqueue.nsr.model.TopicQuery;
import org.joyqueue.nsr.service.TopicService;
import org.joyqueue.nsr.service.internal.BrokerInternalService;
import org.joyqueue.nsr.service.internal.PartitionGroupInternalService;
import org.joyqueue.nsr.service.internal.TopicInternalService;
import org.joyqueue.nsr.service.internal.TransactionInternalService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/nsr/support/DefaultTopicService.class */
public class DefaultTopicService implements TopicService {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultTopicService.class);
    private Messenger messenger;
    private TopicInternalService topicInternalService;
    private PartitionGroupInternalService partitionGroupInternalService;
    private BrokerInternalService brokerInternalService;
    private TransactionInternalService transactionInternalService;

    public DefaultTopicService(Messenger messenger, TopicInternalService topicInternalService, PartitionGroupInternalService partitionGroupInternalService, BrokerInternalService brokerInternalService, TransactionInternalService transactionInternalService) {
        this.messenger = messenger;
        this.topicInternalService = topicInternalService;
        this.partitionGroupInternalService = partitionGroupInternalService;
        this.brokerInternalService = brokerInternalService;
        this.transactionInternalService = transactionInternalService;
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public Topic getById(String str) {
        return this.topicInternalService.getById(str);
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public Topic getTopicByCode(String str, String str2) {
        return this.topicInternalService.getTopicByCode(str, str2);
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public PageResult<Topic> search(QPageQuery<TopicQuery> qPageQuery) {
        return this.topicInternalService.search(qPageQuery);
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public PageResult<Topic> findUnsubscribedByQuery(QPageQuery<TopicQuery> qPageQuery) {
        return this.topicInternalService.findUnsubscribedByQuery(qPageQuery);
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public List<Topic> getAll() {
        return this.topicInternalService.getAll();
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void addTopic(Topic topic, List<PartitionGroup> list) {
        if (this.topicInternalService.getTopicByCode(topic.getName().getNamespace(), topic.getName().getCode()) != null) {
            throw new NsrException(String.format("topic: %s is already exist", topic.getName()));
        }
        logger.info("addTopic, topic: {}, partitionGroups: {}", topic.getName(), list);
        fillBroker(list);
        List<Broker> replicas = getReplicas(list);
        for (PartitionGroup partitionGroup : list) {
            if (PartitionGroup.ElectType.fix.equals(partitionGroup.getElectType())) {
                partitionGroup.setLeader((Integer) partitionGroup.getReplicas().iterator().next());
            }
        }
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.addTopic(topic, list);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new AddTopicEvent(topic, list), replicas);
            } catch (Exception e) {
                logger.error("addTopic exception, topic: {}, partitionGroups: {}", new Object[]{topic, list, e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroups: {}", new Object[]{topic, list, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void removeTopic(Topic topic) {
        Topic topicByCode = this.topicInternalService.getTopicByCode(topic.getName().getNamespace(), topic.getName().getCode());
        if (topicByCode == null) {
            throw new NsrException(String.format("topic: %s is not exist", topicByCode.getName()));
        }
        logger.info("removeTopic, topic: {}", topicByCode);
        List<PartitionGroup> byTopic = this.partitionGroupInternalService.getByTopic(topicByCode.getName());
        fillBroker(byTopic);
        List<Broker> replicas = getReplicas(byTopic);
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.removeTopic(topicByCode);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new RemoveTopicEvent(topicByCode, byTopic), replicas);
            } catch (Exception e) {
                logger.error("removeTopic exception, topic: {}, partitionGroups: {}", new Object[]{topicByCode, byTopic, e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}", topicByCode, e2);
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void addPartitionGroup(PartitionGroup partitionGroup) {
        if (this.topicInternalService.getTopicByCode(partitionGroup.getTopic().getNamespace(), partitionGroup.getTopic().getCode()) == null) {
            throw new NsrException(String.format("topic: %s is not exist", partitionGroup.getTopic()));
        }
        if (this.partitionGroupInternalService.getByTopicAndGroup(partitionGroup.getTopic(), partitionGroup.getGroup()) != null) {
            throw new NsrException(String.format("topic: %s, group: %s is not exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup())));
        }
        logger.info("addPartitionGroup, topic: {}, partitionGroup: {}", partitionGroup.getTopic(), partitionGroup);
        fillBroker(partitionGroup);
        List<Broker> replicas = getReplicas(partitionGroup);
        if (PartitionGroup.ElectType.fix.equals(partitionGroup.getElectType())) {
            partitionGroup.setLeader((Integer) partitionGroup.getReplicas().iterator().next());
        }
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.addPartitionGroup(partitionGroup);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new AddPartitionGroupEvent(partitionGroup.getTopic(), partitionGroup), replicas);
            } catch (Exception e) {
                logger.error("addPartitionGroup exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), partitionGroup, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void removePartitionGroup(PartitionGroup partitionGroup) {
        if (this.topicInternalService.getTopicByCode(partitionGroup.getTopic().getNamespace(), partitionGroup.getTopic().getCode()) == null) {
            throw new NsrException(String.format("topic: %s is not exist", partitionGroup.getTopic()));
        }
        PartitionGroup byTopicAndGroup = this.partitionGroupInternalService.getByTopicAndGroup(partitionGroup.getTopic(), partitionGroup.getGroup());
        if (byTopicAndGroup == null) {
            throw new NsrException(String.format("topic: %s, group: %s is not exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup())));
        }
        logger.info("removePartitionGroup, topic: {}, partitionGroup: {}", partitionGroup.getTopic(), partitionGroup);
        fillBroker(byTopicAndGroup);
        List<Broker> replicas = getReplicas(byTopicAndGroup);
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.removePartitionGroup(partitionGroup);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new RemovePartitionGroupEvent(partitionGroup.getTopic(), byTopicAndGroup), replicas);
            } catch (Exception e) {
                logger.error("removePartitionGroup exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), partitionGroup, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public Collection<Integer> updatePartitionGroup(PartitionGroup partitionGroup) {
        if (this.topicInternalService.getTopicByCode(partitionGroup.getTopic().getNamespace(), partitionGroup.getTopic().getCode()) == null) {
            throw new NsrException(String.format("topic: %s is not exist", partitionGroup.getTopic()));
        }
        PartitionGroup byTopicAndGroup = this.partitionGroupInternalService.getByTopicAndGroup(partitionGroup.getTopic(), partitionGroup.getGroup());
        if (byTopicAndGroup == null) {
            throw new NsrException(String.format("topic: %s, group: %s is exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup())));
        }
        logger.info("updatePartitionGroup, topic: {}, partitionGroup: {}", partitionGroup.getTopic(), partitionGroup);
        fillBroker(partitionGroup);
        fillBroker(byTopicAndGroup);
        List<Broker> replicas = getReplicas(byTopicAndGroup);
        List<Broker> replicas2 = getReplicas(partitionGroup);
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.addAll(replicas);
        newHashSet.addAll(replicas2);
        partitionGroup.setLeader(byTopicAndGroup.getLeader());
        partitionGroup.setIsrs(byTopicAndGroup.getIsrs());
        partitionGroup.setTerm(byTopicAndGroup.getTerm());
        if (CollectionUtils.isEmpty(partitionGroup.getReplicas())) {
            partitionGroup.setLeader(-1);
            partitionGroup.setTerm(0);
        }
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.updatePartitionGroup(partitionGroup);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new UpdatePartitionGroupEvent(partitionGroup.getTopic(), byTopicAndGroup, partitionGroup), (List<Broker>) Lists.newArrayList(newHashSet));
                return Collections.emptyList();
            } catch (Exception e) {
                logger.error("updatePartitionGroup exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), partitionGroup, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void leaderReport(PartitionGroup partitionGroup) {
        PartitionGroup byId = this.partitionGroupInternalService.getById(partitionGroup.getId());
        if (byId == null) {
            throw new NsrException(String.format("topic: %s, group: %s is not exist", byId.getTopic(), Integer.valueOf(byId.getGroup())));
        }
        if (byId.getLeader().equals(partitionGroup.getLeader()) && byId.getTerm().equals(partitionGroup.getTerm())) {
            return;
        }
        PartitionGroup clone = byId.clone();
        clone.setIsrs(partitionGroup.getIsrs());
        clone.setLeader(partitionGroup.getLeader());
        clone.setTerm(partitionGroup.getTerm());
        fillBroker(clone);
        List<Broker> replicas = getReplicas(clone);
        logger.info("leader report, topic: {}, partitionGroup: {}", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()));
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.leaderReport(clone);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new UpdatePartitionGroupEvent(byId.getTopic(), byId, clone), replicas);
            } catch (Exception e) {
                logger.error("leader report exception, topic: {}, partitionGroup: {}", new Object[]{clone.getTopic(), Integer.valueOf(clone.getGroup()), e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), partitionGroup, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public void leaderChange(PartitionGroup partitionGroup) {
        if (this.topicInternalService.getTopicByCode(partitionGroup.getTopic().getNamespace(), partitionGroup.getTopic().getCode()) == null) {
            throw new NsrException(String.format("topic: %s is not exist", partitionGroup.getTopic()));
        }
        PartitionGroup byTopicAndGroup = this.partitionGroupInternalService.getByTopicAndGroup(partitionGroup.getTopic(), partitionGroup.getGroup());
        if (byTopicAndGroup == null) {
            throw new NsrException(String.format("topic: %s, group: %s is not exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup())));
        }
        if (this.brokerInternalService.getById(partitionGroup.getLeader().intValue()) == null) {
            throw new NsrException(String.format("topic: %s, group: %s, broker: {} is not exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), partitionGroup.getLeader()));
        }
        logger.info("leaderChange, topic: {}, partitionGroup: {}", partitionGroup.getTopic(), partitionGroup);
        Broker byId = this.brokerInternalService.getById(byTopicAndGroup.getLeader().intValue());
        if (byId == null) {
            throw new NsrException(String.format("topic: %s, group: %s, broker: {} is not exist", partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), partitionGroup.getLeader()));
        }
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.leaderChange(partitionGroup);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new LeaderChangeEvent(partitionGroup.getTopic(), byTopicAndGroup, partitionGroup), byId);
            } catch (Exception e) {
                logger.error("leaderChange exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), Integer.valueOf(partitionGroup.getGroup()), e});
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}, partitionGroup: {}", new Object[]{partitionGroup.getTopic(), partitionGroup, e2});
            throw new NsrException(e2);
        }
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public List<PartitionGroup> getPartitionGroup(String str, String str2, Object[] objArr) {
        return this.topicInternalService.getPartitionGroup(str, str2, objArr);
    }

    @Override // org.joyqueue.nsr.service.TopicService
    public Topic update(Topic topic) {
        Topic topicByCode = this.topicInternalService.getTopicByCode(topic.getName().getNamespace(), topic.getName().getCode());
        if (topicByCode == null) {
            throw new NsrException(String.format("topic: %s is not exist", topic.getName()));
        }
        logger.info("update, topic: {}", topic);
        List<PartitionGroup> byTopic = this.partitionGroupInternalService.getByTopic(topic.getName());
        List<Broker> replicas = getReplicas(byTopic);
        try {
            this.transactionInternalService.begin();
            try {
                this.topicInternalService.update(topic);
                this.transactionInternalService.commit();
                this.messenger.publish((Messenger) new UpdateTopicEvent(topicByCode, topic), replicas);
                return topic;
            } catch (Exception e) {
                logger.error("removeTopic exception, topic: {}, partitionGroups: {}", new Object[]{topic, byTopic, e});
                this.messenger.publish((Messenger) new UpdateTopicEvent(topic, topicByCode), replicas);
                this.transactionInternalService.rollback();
                throw new NsrException(e);
            }
        } catch (Exception e2) {
            logger.error("beginTransaction exception, topic: {}", topic, e2);
            throw new NsrException(e2);
        }
    }

    protected void fillBroker(PartitionGroup partitionGroup) {
        fillBroker(Lists.newArrayList(new PartitionGroup[]{partitionGroup}));
    }

    protected void fillBroker(List<PartitionGroup> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        HashSet newHashSet = Sets.newHashSet();
        for (PartitionGroup partitionGroup : list) {
            if (partitionGroup.getReplicas() != null) {
                newHashSet.addAll(partitionGroup.getReplicas());
            }
            if (partitionGroup.getLearners() != null) {
                newHashSet.addAll(partitionGroup.getLearners());
            }
        }
        List<Broker> byIds = this.brokerInternalService.getByIds(Lists.newArrayList(newHashSet));
        HashMap newHashMap = Maps.newHashMap();
        for (Broker broker : byIds) {
            newHashMap.put(broker.getId(), broker);
        }
        for (PartitionGroup partitionGroup2 : list) {
            HashMap newHashMap2 = Maps.newHashMap();
            if (partitionGroup2.getReplicas() != null) {
                for (Integer num : partitionGroup2.getReplicas()) {
                    Broker broker2 = (Broker) newHashMap.get(num);
                    if (broker2 == null) {
                        throw new NsrException(String.format("broker %d not exist", num));
                    }
                    newHashMap2.put(num, broker2);
                }
            }
            if (partitionGroup2.getLearners() != null) {
                for (Integer num2 : partitionGroup2.getLearners()) {
                    Broker broker3 = (Broker) newHashMap.get(num2);
                    if (broker3 == null) {
                        throw new NsrException(String.format("broker %d not exist", num2));
                    }
                    newHashMap2.put(num2, broker3);
                }
            }
            partitionGroup2.setBrokers(newHashMap2);
        }
    }

    protected List<Broker> getReplicas(PartitionGroup partitionGroup) {
        return Lists.newArrayList(partitionGroup.getBrokers().values());
    }

    protected List<Broker> getReplicas(List<PartitionGroup> list) {
        HashSet newHashSet = Sets.newHashSet();
        Iterator<PartitionGroup> it = list.iterator();
        while (it.hasNext()) {
            newHashSet.addAll(it.next().getBrokers().values());
        }
        return Lists.newArrayList(newHashSet);
    }
}
