package org.joyqueue.broker.election;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.config.BrokerConfig;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.broker.network.support.BrokerTransportClientFactory;
import org.joyqueue.broker.replication.ReplicaGroup;
import org.joyqueue.broker.replication.ReplicationManager;
import org.joyqueue.broker.replication.TransportSession;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.network.transport.TransportClient;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.config.ClientConfig;
import org.joyqueue.network.transport.exception.TransportException;
import org.joyqueue.store.StoreService;
import org.joyqueue.store.replication.ReplicableStore;
import org.joyqueue.toolkit.concurrent.EventBus;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/election/ElectionManager.class */
public class ElectionManager extends Service implements ElectionService, BrokerContextAware {
    private static Logger logger = LoggerFactory.getLogger(ElectionManager.class);
    private Map<TopicPartitionGroup, LeaderElection> leaderElections;
    private TransportClient transportClient;
    protected ElectionConfig electionConfig;
    private ClusterManager clusterManager;
    private final Map<String, TransportSession> sessions;
    private ScheduledExecutorService electionTimerExecutor;
    private ExecutorService electionExecutor;
    private EventBus<ElectionEvent> electionEventManager;
    private ElectionMetadataManager electionMetadataManager;
    private ReplicationManager replicationManager;
    private StoreService storeService;
    private Consume consume;
    private BrokerMonitor brokerMonitor;
    private BrokerContext brokerContext;
    private BrokerConfig brokerConfig;

    public ElectionManager() {
        this.sessions = new ConcurrentHashMap();
    }

    public ElectionManager(BrokerConfig brokerConfig, ElectionConfig electionConfig, StoreService storeService, Consume consume, ClusterManager clusterManager, BrokerMonitor brokerMonitor) {
        this.sessions = new ConcurrentHashMap();
        this.brokerConfig = brokerConfig;
        this.electionConfig = electionConfig;
        this.clusterManager = clusterManager;
        this.storeService = storeService;
        this.consume = consume;
        this.brokerMonitor = brokerMonitor;
    }

    public ElectionManager(ElectionConfig electionConfig, StoreService storeService, Consume consume, ClusterManager clusterManager, BrokerMonitor brokerMonitor) {
        this(null, electionConfig, storeService, consume, clusterManager, brokerMonitor);
    }

    protected void validate() throws Exception {
        super.validate();
        if (this.brokerConfig == null) {
            this.brokerConfig = this.brokerContext == null ? null : this.brokerContext.getBrokerConfig();
        }
        if (this.electionConfig == null) {
            this.electionConfig = new ElectionConfig(this.brokerContext == null ? null : this.brokerContext.getPropertySupplier());
        }
        if (this.storeService == null && this.brokerContext != null) {
            this.storeService = this.brokerContext.getStoreService();
        }
        if (this.clusterManager == null && this.brokerContext != null) {
            this.clusterManager = this.brokerContext.getClusterManager();
        }
        if (this.consume == null && this.brokerContext != null) {
            this.consume = this.brokerContext.getConsume();
        }
        if (this.brokerMonitor == null && this.brokerContext != null) {
            this.brokerMonitor = this.brokerContext.getBrokerMonitor();
        }
        if (this.brokerMonitor == null) {
            logger.warn("broker monitor is null.");
        }
        Preconditions.checkArgument(this.electionConfig != null, "election config is null");
        Preconditions.checkArgument(this.clusterManager != null, "cluster manager is null");
        Preconditions.checkArgument(this.storeService != null, "store service is null");
        Preconditions.checkArgument(this.consume != null, "consume is null");
    }

    public void doStart() throws Exception {
        super.doStart();
        this.electionEventManager = new EventBus<>("LeaderElectionEvent");
        this.electionEventManager.start();
        BrokerMonitor brokerMonitor = this.brokerMonitor;
        brokerMonitor.getClass();
        addListener(new BrokerMonitor.ElectionListener());
        this.leaderElections = new ConcurrentHashMap();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setIoThreadName("joyqueue-election-io-eventLoop");
        clientConfig.setConnectionTimeout(300);
        clientConfig.getRetryPolicy().setRetryDelay(60000);
        this.transportClient = new BrokerTransportClientFactory().create(clientConfig);
        this.transportClient.start();
        this.electionTimerExecutor = Executors.newScheduledThreadPool(this.electionConfig.getTimerScheduleThreadNum(), new NamedThreadFactory("Election-Timer"));
        this.electionExecutor = new ThreadPoolExecutor(this.electionConfig.getExecutorThreadNumMin(), this.electionConfig.getExecutorThreadNumMax(), 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingDeque(this.electionConfig.getCommandQueueSize()), (ThreadFactory) new NamedThreadFactory("Election-sendCommand"));
        this.replicationManager = new ReplicationManager(this.electionConfig, this.brokerConfig, this.storeService, this.consume, this.brokerMonitor);
        this.replicationManager.start();
        Thread.sleep(1000L);
        this.electionMetadataManager = new ElectionMetadataManager(this.electionConfig.getMetadataPath());
        this.electionMetadataManager.recover(this);
        logger.info("Election manager started.");
    }

    public void doStop() {
        logger.info("Election manager stop");
        for (TopicPartitionGroup topicPartitionGroup : this.leaderElections.keySet()) {
            LeaderElection leaderElection = getLeaderElection(topicPartitionGroup.getTopic(), topicPartitionGroup.getPartitionGroupId());
            if (leaderElection != null) {
                leaderElection.stop();
            }
        }
        this.leaderElections.clear();
        Iterator<Map.Entry<String, TransportSession>> it = this.sessions.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().stop();
        }
        Close.close(this.electionTimerExecutor);
        Close.close(this.electionExecutor);
        Close.close(this.electionEventManager);
        Close.close(this.transportClient);
        Close.close(this.replicationManager);
        super.doStop();
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onPartitionGroupCreate(PartitionGroup.ElectType electType, TopicName topicName, int i, List<Broker> list, Set<Integer> set, int i2, int i3) throws ElectionException {
        logger.info("Create election of topic {}, partition group {}, election type is {}, localBroker is {}, leader is {}, all nodes is {}", new Object[]{topicName, Integer.valueOf(i), electType, Integer.valueOf(i2), Integer.valueOf(i3), JSON.toJSONString(list)});
        List<DefaultElectionNode> list2 = (List) list.stream().map(broker -> {
            return new DefaultElectionNode(broker.getIp() + ":" + broker.getBackEndPort(), broker.getId().intValue());
        }).collect(Collectors.toList());
        ReplicaGroup createReplicaGroup = this.replicationManager.createReplicaGroup(topicName.getFullName(), i, list2, set, i2, i3, this.brokerMonitor);
        createReplicaGroup.setLeaderElection(createLeaderElection(electType, topicName.getFullName(), i, list2, set, i2, i3, createReplicaGroup));
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onPartitionGroupRemove(TopicName topicName, int i) {
        logger.info("Remove election of topic {}, partition group {}", topicName, Integer.valueOf(i));
        removeLeaderElection(topicName.getFullName(), i);
        this.replicationManager.removeReplicaGroup(topicName.getFullName(), i);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onNodeAdd(TopicName topicName, int i, PartitionGroup.ElectType electType, List<Broker> list, Set<Integer> set, Broker broker, int i2, int i3) throws ElectionException {
        logger.info("Add node {} to election of topic {}, partition group {}", new Object[]{broker, topicName, Integer.valueOf(i)});
        LeaderElection leaderElection = getLeaderElection(topicName, i);
        if (leaderElection != null) {
            leaderElection.addNode(new DefaultElectionNode(broker.getIp() + ":" + broker.getBackEndPort(), broker.getId().intValue()));
            return;
        }
        logger.warn("Add node to election of topic {}/partition group {}, leader election is null", topicName, Integer.valueOf(i));
        if (i2 != broker.getId().intValue()) {
            throw new ElectionException(String.format("Add node to election of topic %s/partition group %d, leader election is null", topicName, Integer.valueOf(i)));
        }
        List<DefaultElectionNode> list2 = (List) list.stream().map(broker2 -> {
            return new DefaultElectionNode(broker2.getIp() + ":" + broker2.getBackEndPort(), broker2.getId().intValue());
        }).collect(Collectors.toList());
        ReplicaGroup createReplicaGroup = this.replicationManager.createReplicaGroup(topicName.getFullName(), i, list2, set, i2, i3, this.brokerMonitor);
        createReplicaGroup.setLeaderElection(createLeaderElection(electType, topicName.getFullName(), i, list2, set, i2, i3, createReplicaGroup));
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onNodeRemove(TopicName topicName, int i, int i2, int i3) {
        logger.info("Remove node {} from election of topic {}, partition group {}", new Object[]{Integer.valueOf(i2), topicName, Integer.valueOf(i)});
        if (i2 == i3) {
            removeLeaderElection(topicName.getFullName(), i);
            return;
        }
        LeaderElection leaderElection = getLeaderElection(topicName, i);
        if (leaderElection == null) {
            logger.warn("Remove node from election of topic {}/partition group {}, leader election is null", topicName, Integer.valueOf(i));
        } else {
            leaderElection.removeNode(i2);
        }
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onElectionTypeChange(TopicName topicName, int i, PartitionGroup.ElectType electType, List<Broker> list, Set<Integer> set, int i2, int i3) throws ElectionException {
        logger.info("Election of topic {} partition group {}'s election type change to {}", new Object[]{topicName, Integer.valueOf(i), electType});
        ReplicaGroup replicaGroup = getLeaderElection(topicName, i).getReplicaGroup();
        removeLeaderElection(topicName.getFullName(), i);
        try {
            replicaGroup.setLeaderElection(createLeaderElection(electType, topicName.getFullName(), i, (List) list.stream().map(broker -> {
                return new DefaultElectionNode(broker.getIp() + ":" + broker.getBackEndPort(), broker.getId().intValue());
            }).collect(Collectors.toList()), set, i2, i3, replicaGroup));
        } catch (Exception e) {
            throw new ElectionException("Create leader election failed", e);
        }
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void onLeaderChange(TopicName topicName, int i, int i2) throws Exception {
        logger.info("Election of topic {} partition group {}'s leader change to {}", new Object[]{topicName, Integer.valueOf(i), Integer.valueOf(i2)});
        LeaderElection leaderElection = getLeaderElection(topicName, i);
        if (leaderElection == null) {
            logger.warn("Leader of topic {}/partition group {} change, election is null", topicName, Integer.valueOf(i));
            throw new ElectionException(String.format("Leader of topic %s/partition group %d change, election is null", topicName, Integer.valueOf(i)));
        }
        leaderElection.setLeaderId(i2);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public LeaderElection getLeaderElection(TopicName topicName, int i) {
        return getLeaderElection(topicName.getFullName(), i);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public List<LeaderElection> getLeaderElections() {
        return new ArrayList(this.leaderElections.values());
    }

    public LeaderElection getLeaderElection(String str, int i) {
        return this.leaderElections.get(new TopicPartitionGroup(str, i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restoreLeaderElection(TopicPartitionGroup topicPartitionGroup, ElectionMetadata electionMetadata) throws Exception {
        logger.info("Restore election of topic {}, partition group {}, election type is {}, localBroker is {}, leader is {}, all nodes is {}", new Object[]{topicPartitionGroup.getTopic(), Integer.valueOf(topicPartitionGroup.getPartitionGroupId()), electionMetadata.getElectType(), Integer.valueOf(electionMetadata.getLocalNodeId()), Integer.valueOf(electionMetadata.getLeaderId()), JSON.toJSONString(electionMetadata.getAllNodes())});
        ReplicaGroup createReplicaGroup = this.replicationManager.createReplicaGroup(topicPartitionGroup.getTopic(), topicPartitionGroup.getPartitionGroupId(), new LinkedList(electionMetadata.getAllNodes()), electionMetadata.getLearners(), electionMetadata.getLocalNodeId(), electionMetadata.getLeaderId(), this.brokerMonitor);
        createReplicaGroup.setLeaderElection(createLeaderElection(electionMetadata.getElectType(), topicPartitionGroup.getTopic(), topicPartitionGroup.getPartitionGroupId(), new LinkedList(electionMetadata.getAllNodes()), electionMetadata.getLearners(), electionMetadata.getLocalNodeId(), electionMetadata.getLeaderId(), createReplicaGroup));
    }

    int getLeaderElectionCount() {
        return this.leaderElections.size();
    }

    private synchronized LeaderElection createLeaderElection(PartitionGroup.ElectType electType, String str, int i, List<DefaultElectionNode> list, Set<Integer> set, int i2, int i3, ReplicaGroup replicaGroup) throws ElectionException {
        LeaderElection raftLeaderElection;
        TopicPartitionGroup topicPartitionGroup = new TopicPartitionGroup(str, i);
        if (this.leaderElections.get(topicPartitionGroup) != null) {
            logger.warn("Create leader election for topic {}/partition group {}, election is not null", str, Integer.valueOf(i));
            removeLeaderElection(str, i);
        }
        ReplicableStore replicableStore = this.storeService.getReplicableStore(str, i);
        if (replicableStore == null) {
            throw new ElectionException(String.format("Replicable store of topic %s partition group %d is null", str, Integer.valueOf(i)));
        }
        if (electType == PartitionGroup.ElectType.fix) {
            raftLeaderElection = new FixLeaderElection(topicPartitionGroup, this.electionConfig, this, this.clusterManager, this.electionMetadataManager, replicableStore, replicaGroup, this.electionEventManager, i3, i2, list);
        } else {
            if (electType != PartitionGroup.ElectType.raft) {
                throw new ElectionException("Incorrect election type {}" + electType);
            }
            raftLeaderElection = new RaftLeaderElection(topicPartitionGroup, this.electionConfig, this, this.clusterManager, this.electionMetadataManager, replicableStore, replicaGroup, this.electionTimerExecutor, this.electionExecutor, this.electionEventManager, i2, list, set);
        }
        try {
            raftLeaderElection.start();
            this.leaderElections.put(topicPartitionGroup, raftLeaderElection);
            return raftLeaderElection;
        } catch (Exception e) {
            throw new ElectionException("Leader election start fail" + e);
        }
    }

    void removeLeaderElection(String str, int i) {
        synchronized (this) {
            TopicPartitionGroup topicPartitionGroup = new TopicPartitionGroup(str, i);
            LeaderElection leaderElection = this.leaderElections.get(topicPartitionGroup);
            if (leaderElection == null) {
                logger.warn("Remove leader election of partition group {}, leader election is null", topicPartitionGroup);
                return;
            }
            this.electionMetadataManager.removeElectionMetadata(topicPartitionGroup);
            leaderElection.stop();
            this.leaderElections.remove(topicPartitionGroup);
        }
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void addListener(EventListener<ElectionEvent> eventListener) {
        this.electionEventManager.addListener(eventListener);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void removeListener(EventListener<ElectionEvent> eventListener) {
        this.electionEventManager.removeListener(eventListener);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public String describe(String str, int i) {
        return this.electionMetadataManager.describe(str, i);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public String describe() {
        return this.electionMetadataManager.describe();
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void syncElectionMetadataFromNameService() {
        this.electionMetadataManager.syncElectionMetadataFromNameService(this.clusterManager);
    }

    @Override // org.joyqueue.broker.election.ElectionService
    public void updateTerm(String str, int i, int i2) {
        this.electionMetadataManager.updateTerm(str, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendCommand(String str, Command command, int i, CommandCallback commandCallback) throws TransportException {
        if (!isStarted()) {
            logger.info("Send election command but election manager is stopped");
            return;
        }
        TransportSession transportSession = this.sessions.get(str);
        if (transportSession == null) {
            synchronized (this.sessions) {
                transportSession = this.sessions.get(str);
                if (transportSession == null) {
                    logger.info("Send election command, create transport of address {}", str);
                    transportSession = new TransportSession(str, this.transportClient);
                    this.sessions.put(str, transportSession);
                }
            }
        }
        transportSession.sendCommand(command, i, commandCallback);
    }

    @Override // org.joyqueue.broker.BrokerContextAware
    public void setBrokerContext(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
    }
}
