package org.joyqueue.broker.consumer.position;

import com.google.common.base.Preconditions;
import com.jd.laf.extension.ExtensionManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.config.BrokerStoreConfig;
import org.joyqueue.broker.consumer.ConsumeConfig;
import org.joyqueue.broker.consumer.model.ConsumePartition;
import org.joyqueue.broker.consumer.position.model.Position;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.event.EventType;
import org.joyqueue.event.MetaEvent;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.nsr.event.AddConsumerEvent;
import org.joyqueue.nsr.event.AddPartitionGroupEvent;
import org.joyqueue.nsr.event.RemoveConsumerEvent;
import org.joyqueue.nsr.event.RemovePartitionGroupEvent;
import org.joyqueue.nsr.event.UpdatePartitionGroupEvent;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager.class */
public class PositionManager extends Service {
    private static Logger logger = LoggerFactory.getLogger(PositionManager.class);
    private StoreService storeService;
    private ClusterManager clusterManager;
    private ConsumeConfig config;
    private PositionStore<ConsumePartition, Position> positionStore;
    private LoopThread thread;
    private ExecutorService flushIndexThread;
    private Map<ConsumePartition, AtomicLong> lastAckTimeTrace = new ConcurrentHashMap();
    private AtomicLong lastFlushIndexTimestamp = new AtomicLong();

    /* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager$AddConsumeListener.class */
    class AddConsumeListener implements EventListener<MetaEvent> {
        AddConsumeListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            try {
                if (metaEvent.getEventType() == EventType.ADD_CONSUMER) {
                    AddConsumerEvent addConsumerEvent = (AddConsumerEvent) metaEvent;
                    PositionManager.logger.info("listen add consume event:[{}]", addConsumerEvent.toString());
                    PositionManager.this.addConsumer(addConsumerEvent.getTopic(), addConsumerEvent.getConsumer().getApp());
                }
            } catch (Exception e) {
                PositionManager.logger.error("AddConsumeListener error.", e);
            }
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager$AddPartitionGroupListener.class */
    class AddPartitionGroupListener implements EventListener<MetaEvent> {
        AddPartitionGroupListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            try {
                if (metaEvent.getEventType() == EventType.ADD_PARTITION_GROUP) {
                    AddPartitionGroupEvent addPartitionGroupEvent = (AddPartitionGroupEvent) metaEvent;
                    PositionManager.logger.info("listen add partition group event:[{}]", addPartitionGroupEvent.toString());
                    PositionManager.this.addPartitionGroup(addPartitionGroupEvent.getTopic(), addPartitionGroupEvent.getPartitionGroup());
                }
            } catch (Exception e) {
                PositionManager.logger.error("AddPartitionGroupListener error.", e);
            }
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager$RemoveConsumeListener.class */
    class RemoveConsumeListener implements EventListener<MetaEvent> {
        RemoveConsumeListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            try {
                if (metaEvent.getEventType() == EventType.REMOVE_CONSUMER) {
                    RemoveConsumerEvent removeConsumerEvent = (RemoveConsumerEvent) metaEvent;
                    PositionManager.logger.info("listen remove consume event:[{}]", removeConsumerEvent.toString());
                    PositionManager.this.removeConsumer(removeConsumerEvent.getTopic(), removeConsumerEvent.getConsumer().getApp());
                }
            } catch (Exception e) {
                PositionManager.logger.error("RemoveConsumeListener error.", e);
            }
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager$RemovePartitionGroupListener.class */
    class RemovePartitionGroupListener implements EventListener<MetaEvent> {
        RemovePartitionGroupListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            try {
                if (metaEvent.getEventType() == EventType.REMOVE_PARTITION_GROUP) {
                    RemovePartitionGroupEvent removePartitionGroupEvent = (RemovePartitionGroupEvent) metaEvent;
                    PositionManager.logger.info("listen remove partition group event:[{}]", removePartitionGroupEvent.toString());
                    PositionManager.this.removePartitionGroup(removePartitionGroupEvent.getTopic(), removePartitionGroupEvent.getPartitionGroup());
                }
            } catch (Exception e) {
                PositionManager.logger.error("RemovePartitionGroupListener error.", e);
            }
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/position/PositionManager$UpdatePartitionGroupListener.class */
    class UpdatePartitionGroupListener implements EventListener<MetaEvent> {
        UpdatePartitionGroupListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            try {
                if (metaEvent.getEventType() == EventType.UPDATE_PARTITION_GROUP) {
                    UpdatePartitionGroupEvent updatePartitionGroupEvent = (UpdatePartitionGroupEvent) metaEvent;
                    PositionManager.logger.info("listen update partition group event:[{}]", updatePartitionGroupEvent.toString());
                    TopicName topic = updatePartitionGroupEvent.getTopic();
                    PartitionGroup newPartitionGroup = updatePartitionGroupEvent.getNewPartitionGroup();
                    Set fetchAllPartitions = PositionManager.this.clusterManager.getTopicConfig(topic).fetchAllPartitions();
                    Iterator it = PositionManager.this.positionStore.iterator();
                    while (it.hasNext()) {
                        ConsumePartition consumePartition = (ConsumePartition) it.next();
                        if (StringUtils.equals(consumePartition.getTopic(), topic.getFullName()) && !fetchAllPartitions.contains(Short.valueOf(consumePartition.getPartition()))) {
                            it.remove();
                        }
                    }
                    PositionManager.this.addPartitionGroup(topic, newPartitionGroup);
                }
            } catch (Exception e) {
                PositionManager.logger.error("UpdatePartitionGroupListener error.", e);
            }
        }
    }

    public PositionManager(ClusterManager clusterManager, StoreService storeService, ConsumeConfig consumeConfig) {
        this.clusterManager = clusterManager;
        this.storeService = storeService;
        this.config = consumeConfig;
        Preconditions.checkArgument(this.config != null, "config can not be null");
    }

    protected void validate() throws Exception {
        super.validate();
        Preconditions.checkArgument(this.clusterManager != null, "cluster manager can not be null");
        if (this.positionStore == null) {
            this.positionStore = (PositionStore) ExtensionManager.getOrLoadExtension(PositionStore.class);
            if (this.positionStore instanceof LocalFileStore) {
                ((LocalFileStore) this.positionStore).setBasePath(this.config.getConsumePositionPath());
            }
        }
        this.flushIndexThread = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(10), (ThreadFactory) new NamedThreadFactory("joyqueue-consume-flush-index-threads", true));
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.positionStore.start();
        this.thread = LoopThread.builder().sleepTime(BrokerStoreConfig.DEFAULT_STORE_CLEAN_SCHEDULE_END, BrokerStoreConfig.DEFAULT_STORE_CLEAN_SCHEDULE_END).name("Check-Subscribe-Thread").onException(th -> {
            logger.error(th.getMessage(), th);
        }).doWork(this::compensationPosition).build();
        this.thread.start();
        this.clusterManager.addListener(new AddConsumeListener());
        this.clusterManager.addListener(new RemoveConsumeListener());
        this.clusterManager.addListener(new AddPartitionGroupListener());
        this.clusterManager.addListener(new RemovePartitionGroupListener());
        this.clusterManager.addListener(new UpdatePartitionGroupListener());
        logger.info("PositionManager is started.");
    }

    private void compensationPosition() {
        Iterator<ConsumePartition> it = this.positionStore.iterator();
        while (it.hasNext()) {
            ConsumePartition next = it.next();
            if (this.clusterManager.tryGetConsumerPolicy(TopicName.parse(next.getTopic()), next.getApp()) == null) {
                it.remove();
                logger.info("Remove consume position by ConsumePosition:[{}]", next.toString());
            }
        }
    }

    protected void doStop() {
        super.doStop();
        this.flushIndexThread.shutdownNow();
        this.positionStore.forceFlush();
        this.positionStore.stop();
        logger.info("PositionManager is stopped.");
    }

    public Map<ConsumePartition, AtomicLong> getLastAckTimeTrace() {
        return this.lastAckTimeTrace;
    }

    public Map<ConsumePartition, Position> getConsumePosition(TopicName topicName, String str, int i) {
        List<String> localSubscribeAppByTopic = str == null ? this.clusterManager.getLocalSubscribeAppByTopic(topicName) : Arrays.asList(str);
        HashMap hashMap = new HashMap();
        if (localSubscribeAppByTopic != null && !localSubscribeAppByTopic.isEmpty()) {
            Iterator<PartitionGroup> it = this.clusterManager.getPartitionGroup(topicName).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                PartitionGroup next = it.next();
                if (next.getGroup() == i) {
                    List<String> list = localSubscribeAppByTopic;
                    next.getPartitions().stream().forEach(sh -> {
                        list.stream().forEach(str2 -> {
                            ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str2, sh.shortValue());
                            consumePartition.setPartitionGroup(i);
                            hashMap.put(consumePartition, this.positionStore.get(consumePartition));
                        });
                    });
                    break;
                }
            }
        }
        return hashMap;
    }

    public Map<ConsumePartition, Position> getConsumePosition(TopicName topicName, int i) {
        return getConsumePosition(topicName, null, i);
    }

    public boolean setConsumePosition(Map<ConsumePartition, Position> map) {
        if (map == null) {
            return false;
        }
        try {
            map.entrySet().stream().forEach(entry -> {
                this.positionStore.put((ConsumePartition) entry.getKey(), (Position) entry.getValue());
            });
            tryForceFlush();
            return true;
        } catch (Exception e) {
            logger.error("set consume position error.", e);
            return false;
        }
    }

    public long getLastMsgAckIndex(TopicName topicName, String str, short s) throws JoyQueueException {
        Position position = this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
        if (position == null) {
            throw new JoyQueueException(JoyQueueCode.CONSUME_POSITION_NULL, new Object[]{"topic=" + topicName + ",app=" + str + ",partition=" + ((int) s)});
        }
        return position.getAckCurIndex();
    }

    public boolean updateLastMsgAckIndex(TopicName topicName, String str, short s, long j) throws JoyQueueException {
        return updateLastMsgAckIndex(topicName, str, s, j, true);
    }

    public boolean updateLastMsgAckIndex(TopicName topicName, String str, short s, long j, boolean z) throws JoyQueueException {
        logger.debug("Update last ack index, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
        checkIndex(topicName, s, j);
        markLastAckTime(topicName, str, s);
        Position position = this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
        if (position == null) {
            logger.error("Position is null, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
            addAndUpdatePosition(topicName, str, s, j);
            return true;
        }
        position.setAckCurIndex(j);
        if (!z) {
            return true;
        }
        position.setPullCurIndex(-1L);
        return true;
    }

    private void checkIndex(TopicName topicName, short s, long j) throws JoyQueueException {
        Integer partitionGroupId = this.clusterManager.getPartitionGroupId(topicName, s);
        if (partitionGroupId == null) {
            throw new JoyQueueException(JoyQueueCode.CONSUME_POSITION_META_DATA_NULL, new Object[]{String.format("topic:[%s], partition:[%s], index:[%s]", topicName, Short.valueOf(s), Long.valueOf(j))});
        }
        PartitionGroupStore store = this.storeService.getStore(topicName.getFullName(), partitionGroupId.intValue());
        if (j < store.getLeftIndex(s) && j != -1) {
            throw new JoyQueueException(JoyQueueCode.SE_INDEX_UNDERFLOW, new Object[]{"index less than leftIndex error."});
        }
        if (j > store.getRightIndex(s)) {
            throw new JoyQueueException(JoyQueueCode.SE_INDEX_UNDERFLOW, new Object[]{"index more than rightIndex error."});
        }
    }

    private void markLastAckTime(TopicName topicName, String str, short s) {
        ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str, s);
        AtomicLong atomicLong = this.lastAckTimeTrace.get(consumePartition);
        if (atomicLong != null) {
            atomicLong.set(SystemClock.now());
        } else {
            this.lastAckTimeTrace.put(consumePartition, new AtomicLong(SystemClock.now()));
        }
    }

    private void addAndUpdatePosition(TopicName topicName, String str, short s, long j) throws JoyQueueException {
        logger.info("Try to init a position by topic:{}, app:{}, partition:{}, curIndex:{}", new Object[]{topicName.getFullName(), str, Short.valueOf(s), Long.valueOf(j)});
        if (topicName == null || str == null || str.isEmpty()) {
            return;
        }
        checkState();
        PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(topicName, s);
        if (partitionGroup == null) {
            logger.error("Fail to add and update partition consume position by topic:[{}], app:[{}], partition:[{}], index:[{}]", new Object[]{topicName.getFullName(), str, Short.valueOf(s), Long.valueOf(j)});
            throw new JoyQueueException(JoyQueueCode.FW_PARTITION_BROKER_NOT_LEADER, new Object[]{IndexAndMetadata.NO_METADATA});
        }
        ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str, s);
        consumePartition.setPartitionGroup(partitionGroup.getGroup());
        long max = Math.max(j, 0L);
        this.positionStore.putIfAbsent(consumePartition, new Position(max, max, max, max));
        logger.info("Success to add and update partition consume position by topic:{}, app:{}, partition:{}, curIndex:{}", new Object[]{topicName.getFullName(), str, Short.valueOf(s), Long.valueOf(max)});
        this.positionStore.forceFlush();
    }

    public boolean updateStartMsgAckIndex(TopicName topicName, String str, short s, long j) throws JoyQueueException {
        logger.debug("Update stater ack index, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
        Position position = this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
        if (position != null) {
            position.setAckStartIndex(j);
            return true;
        }
        logger.error("Position is null, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
        addAndUpdatePosition(topicName, str, s, j);
        return true;
    }

    public long getLastMsgPullIndex(TopicName topicName, String str, short s) throws JoyQueueException {
        Position position = this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
        if (position == null) {
            throw new JoyQueueException(JoyQueueCode.CONSUME_POSITION_NULL, new Object[]{"topic=" + topicName + ",app=" + str + ",partition=" + ((int) s)});
        }
        return position.getPullCurIndex();
    }

    public boolean updateLastMsgPullIndex(TopicName topicName, String str, short s, long j) throws JoyQueueException {
        logger.debug("Update last pull index, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
        Position position = this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
        if (position != null) {
            position.setPullCurIndex(j);
            return true;
        }
        logger.error("Position is null, topic:{}, app:{}, partition:{}, index:{}", new Object[]{topicName, str, Short.valueOf(s), Long.valueOf(j)});
        addAndUpdatePosition(topicName, str, s, j);
        return true;
    }

    private long getMaxMsgIndex(TopicName topicName, short s) {
        return this.storeService.getStore(topicName.getFullName(), this.clusterManager.getPartitionGroupId(topicName, s).intValue()).getRightIndex(s);
    }

    protected void checkState() {
        if (!isStarted()) {
            throw new IllegalStateException("offset manager was stopped");
        }
    }

    public void addConsumer(TopicName topicName, String str) {
        if (topicName == null || str == null || str.isEmpty()) {
            return;
        }
        checkState();
        List<Short> masterPartitionList = this.clusterManager.getMasterPartitionList(topicName);
        logger.debug("add consumer partitionList:[{}]", masterPartitionList.toString());
        masterPartitionList.stream().forEach(sh -> {
            ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str, sh.shortValue());
            consumePartition.setPartitionGroup(this.clusterManager.getPartitionGroupId(topicName, sh.shortValue()).intValue());
            long max = Math.max(getMaxMsgIndex(topicName, sh.shortValue()), 0L);
            this.positionStore.putIfAbsent(consumePartition, new Position(max, max, max, max));
            logger.debug("Add ConsumePartition by topic:{}, app:{}, partition:{}, curIndex:{}", new Object[]{topicName.getFullName(), str, sh, Long.valueOf(max)});
        });
        this.positionStore.forceFlush();
    }

    public void removeConsumer(TopicName topicName, String str) {
        if (topicName == null || str == null || str.isEmpty()) {
            return;
        }
        checkState();
        List<Short> partitionList = this.clusterManager.getPartitionList(topicName);
        logger.debug("remove consumer partitionList:[{}]", partitionList.toString());
        partitionList.stream().forEach(sh -> {
            ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str, sh.shortValue());
            logger.info("Remove ConsumePartition by topic:{}, app:{}, partition:{}, curIndex:{}", new Object[]{consumePartition.getTopic(), consumePartition.getApp(), Short.valueOf(consumePartition.getPartition()), String.valueOf(this.positionStore.remove(consumePartition))});
        });
        this.positionStore.forceFlush();
    }

    public Position getPosition(TopicName topicName, String str, short s) {
        return this.positionStore.get(new ConsumePartition(topicName.getFullName(), str, s));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addPartitionGroup(TopicName topicName, PartitionGroup partitionGroup) {
        List<String> appByTopic = this.clusterManager.getAppByTopic(topicName);
        Set partitions = partitionGroup.getPartitions();
        logger.debug("add partitionGroup appList:[{}], partitions:[{}]", appByTopic.toString(), partitions.toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        partitions.stream().forEach(sh -> {
            long maxMsgIndex = getMaxMsgIndex(topicName, sh.shortValue());
            long max = Math.max(maxMsgIndex, 0L);
            appByTopic.stream().forEach(str -> {
                ConsumePartition consumePartition = new ConsumePartition(topicName.getFullName(), str, sh.shortValue());
                consumePartition.setPartitionGroup(partitionGroup.getGroup());
                Position putIfAbsent = this.positionStore.putIfAbsent(consumePartition, new Position(maxMsgIndex, maxMsgIndex, maxMsgIndex, maxMsgIndex));
                if (putIfAbsent == null) {
                    atomicBoolean.set(true);
                    logger.info("Add consume partition topic:{}, app:{}, partition:{}, curIndex:{}", new Object[]{topicName.getFullName(), str, sh, Long.valueOf(max)});
                    return;
                }
                long ackCurIndex = putIfAbsent.getAckCurIndex();
                if (ackCurIndex > maxMsgIndex) {
                    putIfAbsent.setAckCurIndex(maxMsgIndex);
                    atomicBoolean.set(true);
                    logger.warn("Update consume position topic:{}, app:{}, partition:{}, curIndex:{}, ackIndex:{}", new Object[]{topicName.getFullName(), str, sh, Long.valueOf(max), Long.valueOf(ackCurIndex)});
                }
            });
        });
        if (atomicBoolean.get()) {
            this.positionStore.forceFlush();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void removePartitionGroup(TopicName topicName, PartitionGroup partitionGroup) {
        logger.info("remove partitionGroup topic:[{}], partitionGroup:[{}]", topicName.getFullName(), partitionGroup);
        Iterator<ConsumePartition> it = this.positionStore.iterator();
        while (it.hasNext()) {
            ConsumePartition next = it.next();
            if (next != null && next.getPartitionGroup() == partitionGroup.getGroup() && StringUtils.equals(next.getTopic(), topicName.getFullName())) {
                it.remove();
                logger.info("Remove ConsumePartition by topic:{}, app:{}, partition:{}", new Object[]{next.getTopic(), next.getApp(), Short.valueOf(next.getPartition())});
            }
        }
        this.positionStore.forceFlush();
    }

    protected void tryForceFlush() {
        if (this.config.getIndexFlushInterval() <= 0) {
            this.positionStore.forceFlush();
            return;
        }
        long now = SystemClock.now();
        long j = this.lastFlushIndexTimestamp.get();
        if (now - j >= this.config.getIndexFlushInterval() && this.lastFlushIndexTimestamp.compareAndSet(j, now)) {
            this.flushIndexThread.execute(() -> {
                this.positionStore.forceFlush();
            });
        }
    }
}
