package org.joyqueue.broker.replication;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.config.BrokerConfig;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.election.DefaultElectionNode;
import org.joyqueue.broker.election.ElectionConfig;
import org.joyqueue.broker.election.ElectionException;
import org.joyqueue.broker.election.TopicPartitionGroup;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.store.StoreService;
import org.joyqueue.store.replication.ReplicableStore;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/replication/ReplicationManager.class */
public class ReplicationManager extends Service {
    private static Logger logger = LoggerFactory.getLogger(ReplicationManager.class);
    private ConcurrentHashMap<TopicPartitionGroup, ReplicaGroup> replicaGroups;
    private ElectionConfig electionConfig;
    private BrokerConfig brokerConfig;
    private final ConcurrentHashMap<String, Transport> sessions = new ConcurrentHashMap<>();
    private StoreService storeService;
    private Consume consume;
    private ExecutorService replicateExecutor;
    private ScheduledExecutorService replicateTimerExecutor;
    private BlockingDeque replicateQueue;

    public ReplicationManager(ElectionConfig electionConfig, BrokerConfig brokerConfig, StoreService storeService, Consume consume, BrokerMonitor brokerMonitor) {
        Preconditions.checkArgument(electionConfig != null, "election config is null");
        Preconditions.checkArgument(storeService != null, "store service is null");
        Preconditions.checkArgument(consume != null, "consume is null");
        this.electionConfig = electionConfig;
        this.brokerConfig = brokerConfig;
        this.storeService = storeService;
        this.consume = consume;
    }

    public void doStart() throws Exception {
        super.doStart();
        this.replicaGroups = new ConcurrentHashMap<>();
        this.replicateQueue = new LinkedBlockingDeque(this.electionConfig.getCommandQueueSize());
        this.replicateExecutor = new ThreadPoolExecutor(this.electionConfig.getReplicateThreadNumMin(), this.electionConfig.getReplicateThreadNumMax(), 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) this.replicateQueue, (ThreadFactory) new NamedThreadFactory("Replicate-sendCommand"));
        this.replicateTimerExecutor = Executors.newScheduledThreadPool(this.electionConfig.getTimerScheduleThreadNum());
        this.replicateTimerExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.joyqueue.broker.replication.ReplicationManager.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    int i2 = 0;
                    Iterator it = ReplicationManager.this.replicaGroups.values().iterator();
                    while (it.hasNext()) {
                        i++;
                        if (((ReplicaGroup) it.next()).isLeader()) {
                            i2++;
                        }
                    }
                    ReplicationManager.logger.info("ReplicationManager, managed replica group count {} ,leader count {} , replicate queue capacity is {}, current size is {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(ReplicationManager.this.electionConfig.getCommandQueueSize()), Integer.valueOf(ReplicationManager.this.replicateQueue.size())});
                } catch (Throwable th) {
                    ReplicationManager.logger.warn("ReplicateManger schedule error.", th);
                }
            }
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    public void doStop() {
        Close.close(this.replicateExecutor);
        super.doStop();
    }

    public synchronized ReplicaGroup createReplicaGroup(String str, int i, List<DefaultElectionNode> list, Set<Integer> set, int i2, int i3, BrokerMonitor brokerMonitor) throws ElectionException {
        TopicPartitionGroup topicPartitionGroup = new TopicPartitionGroup(str, i);
        if (this.replicaGroups.get(topicPartitionGroup) != null) {
            logger.warn("Create replica group for topic {} partition group {} failed, replication group is not null", str, Integer.valueOf(i));
            removeReplicaGroup(str, i);
        }
        ReplicableStore replicableStore = this.storeService.getReplicableStore(str, i);
        if (replicableStore == null) {
            logger.info("Create replica group for topic {} partition group {} failed, replicable store is null", str, Integer.valueOf(i));
            throw new ElectionException(String.format("Create Replica group for topic %s partition group %d failed, replicable store is null", str, Integer.valueOf(i)));
        }
        ReplicaGroup replicaGroup = new ReplicaGroup(topicPartitionGroup, this, replicableStore, this.electionConfig, this.brokerConfig, this.consume, this.replicateExecutor, brokerMonitor, list, set, i2, i3);
        try {
            replicaGroup.start();
            this.replicaGroups.put(topicPartitionGroup, replicaGroup);
            return replicaGroup;
        } catch (Exception e) {
            throw new ElectionException("Create replica group fail" + e);
        }
    }

    public synchronized void removeReplicaGroup(String str, int i) {
        TopicPartitionGroup topicPartitionGroup = new TopicPartitionGroup(str, i);
        ReplicaGroup replicaGroup = this.replicaGroups.get(topicPartitionGroup);
        if (replicaGroup == null) {
            logger.info("Remove replica group of topic {} partition group {}, replication group is null", str, Integer.valueOf(i));
        } else {
            replicaGroup.stop();
            this.replicaGroups.remove(topicPartitionGroup);
        }
    }

    public ReplicaGroup getReplicaGroup(String str, int i) {
        ReplicaGroup replicaGroup = this.replicaGroups.get(new TopicPartitionGroup(str, i));
        if (replicaGroup == null) {
            logger.info("Get replica group of topic {} partition group {}, replication group is null", str, Integer.valueOf(i));
        }
        return replicaGroup;
    }
}
