package org.joyqueue.broker.consumer;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.archive.ArchiveManager;
import org.joyqueue.broker.archive.ConsumeArchiveService;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.model.ConsumePartition;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.broker.consumer.position.model.Position;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.domain.Consumer;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.domain.TopicType;
import org.joyqueue.event.EventType;
import org.joyqueue.event.MetaEvent;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.MessageLocation;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Joint;
import org.joyqueue.nsr.event.RemoveConsumerEvent;
import org.joyqueue.nsr.event.UpdateConsumerEvent;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/consumer/ConsumeManager.class */
public class ConsumeManager extends Service implements Consume, BrokerContextAware {
    private ConcurrentConsumer concurrentConsumption;
    private PartitionConsumption partitionConsumption;
    private MessageRetry messageRetry;
    private PartitionManager partitionManager;
    private ClusterManager clusterManager;
    private StoreService storeService;
    private PositionManager positionManager;
    private FilterMessageSupport filterMessageSupport;
    private BrokerMonitor brokerMonitor;
    private ArchiveManager archiveManager;
    private BrokerContext brokerContext;
    private ConsumeConfig consumeConfig;
    private SessionManager sessionManager;
    private Timer resetBroadcastIndexTimer;
    private final Logger logger = LoggerFactory.getLogger(ConsumeManager.class);
    private ConcurrentMap<Joint, AtomicLong> consumeCounter = new ConcurrentHashMap();
    private PartitionLockInstance lockInstance = new PartitionLockInstance();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/broker/consumer/ConsumeManager$ConsumeStrategy.class */
    public enum ConsumeStrategy {
        SEQUENCE,
        CONCURRENT,
        DEFAULT
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/ConsumeManager$SubscriptionListener.class */
    class SubscriptionListener implements EventListener<MetaEvent> {
        SubscriptionListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            if (metaEvent.getEventType() == EventType.REMOVE_CONSUMER) {
                RemoveConsumerEvent removeConsumerEvent = (RemoveConsumerEvent) metaEvent;
                ConsumeManager.this.logger.info("Listen clusterManger. RemoveConsumer, Event:[{}]", removeConsumerEvent);
                ConsumeManager.this.consumeCounter.remove(new Joint(removeConsumerEvent.getTopic().getCode(), removeConsumerEvent.getConsumer().getApp()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/consumer/ConsumeManager$UpdateConsumeListener.class */
    public class UpdateConsumeListener implements EventListener<MetaEvent> {
        UpdateConsumeListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            if (metaEvent.getEventType() == EventType.UPDATE_CONSUMER) {
                UpdateConsumerEvent updateConsumerEvent = (UpdateConsumerEvent) metaEvent;
                ConsumeManager.this.logger.info("listen update consume event.");
                try {
                    ConsumeManager.this.partitionManager.resetRetryProbability(ConsumeManager.this.clusterManager.getConsumerPolicy(updateConsumerEvent.getTopic(), updateConsumerEvent.getNewConsumer().getApp()).getReadRetryProbability());
                } catch (JoyQueueException e) {
                    ConsumeManager.this.logger.error("listen update consume event error.", e);
                }
            }
        }
    }

    public ConsumeManager() {
    }

    public ConsumeManager(ClusterManager clusterManager, StoreService storeService, MessageRetry messageRetry, BrokerMonitor brokerMonitor, ArchiveManager archiveManager, PositionManager positionManager) {
        this.clusterManager = clusterManager;
        this.storeService = storeService;
        this.messageRetry = messageRetry;
        this.brokerMonitor = brokerMonitor;
        this.positionManager = positionManager;
        this.archiveManager = archiveManager;
    }

    protected void validate() throws Exception {
        super.validate();
        if (this.consumeConfig == null) {
            this.consumeConfig = new ConsumeConfig(this.brokerContext != null ? this.brokerContext.getPropertySupplier() : null);
        }
        if (this.clusterManager == null && this.brokerContext != null) {
            this.clusterManager = this.brokerContext.getClusterManager();
        }
        if (this.brokerMonitor == null && this.brokerContext != null) {
            this.brokerMonitor = this.brokerContext.getBrokerMonitor();
        }
        if (this.messageRetry == null && this.brokerContext != null) {
            this.messageRetry = this.brokerContext.getRetryManager();
        }
        if (this.archiveManager == null && this.brokerContext != null) {
            this.archiveManager = this.brokerContext.getArchiveManager();
        }
        if (this.storeService == null && this.brokerContext != null) {
            this.storeService = this.brokerContext.getStoreService();
        }
        if (this.sessionManager == null && this.brokerContext != null) {
            this.sessionManager = this.brokerContext.getSessionManager();
        }
        Preconditions.checkArgument(this.clusterManager != null, "cluster manager can not be null.");
        Preconditions.checkArgument(this.storeService != null, "cluster manager can not be null.");
        if (this.brokerMonitor == null) {
            this.logger.warn("broker monitor is null.");
        }
        if (this.messageRetry == null) {
            this.logger.warn("message retry is null.");
        }
        if (this.archiveManager == null) {
            this.logger.warn("archive manager is null.");
        }
        this.filterMessageSupport = new FilterMessageSupport(this.clusterManager);
        this.partitionManager = this.consumeConfig.useLegacyPartitionManager() ? new LegacyPartitionManager(this.clusterManager, this.sessionManager) : new CasPartitionManager(this.clusterManager, this.sessionManager);
        this.positionManager = new PositionManager(this.clusterManager, this.storeService, this.consumeConfig);
        this.brokerContext.positionManager(this.positionManager);
        this.partitionConsumption = new PartitionConsumption(this.clusterManager, this.storeService, this.partitionManager, this.positionManager, this.messageRetry, this.filterMessageSupport, this.archiveManager, this.consumeConfig);
        this.concurrentConsumption = this.consumeConfig.useLegacyConcurrentConsumer() ? new ConcurrentConsumption(this.clusterManager, this.storeService, this.partitionManager, this.messageRetry, this.positionManager, this.filterMessageSupport, this.archiveManager, this.sessionManager) : new SlideWindowConcurrentConsumer(this.clusterManager, this.storeService, this.partitionManager, this.messageRetry, this.positionManager, this.filterMessageSupport, this.archiveManager, this.consumeConfig);
        this.resetBroadcastIndexTimer = new Timer("joyqueuue-consume-reset-broadcast-index-timer");
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.partitionConsumption.start();
        this.concurrentConsumption.start();
        this.positionManager.start();
        this.clusterManager.addListener(new SubscriptionListener());
        registerEventListener(this.clusterManager);
        this.resetBroadcastIndexTimer.scheduleAtFixedRate(new TimerTask() { // from class: org.joyqueue.broker.consumer.ConsumeManager.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ConsumeManager.this.doResetBroadcastIndex();
            }
        }, this.consumeConfig.getBroadcastIndexResetInterval(), this.consumeConfig.getBroadcastIndexResetInterval());
        this.logger.info("ConsumeManager is started.");
    }

    protected void doStop() {
        super.doStop();
        Close.close(this.partitionConsumption);
        Close.close(this.messageRetry);
        Close.close(this.partitionConsumption);
        Close.close(this.concurrentConsumption);
        this.resetBroadcastIndexTimer.cancel();
        this.partitionManager.close();
        this.logger.info("ConsumeManager is stopped.");
    }

    private void registerEventListener(ClusterManager clusterManager) {
        clusterManager.addListener(new UpdateConsumeListener());
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public PullResult getMessage(Consumer consumer, int i, int i2) throws JoyQueueException {
        PullResult message;
        Preconditions.checkArgument(consumer != null, "消费者信息不能为空");
        long now = SystemClock.now();
        Consumer.ConsumerPolicy consumerPolicy = this.clusterManager.getConsumerPolicy(TopicName.parse(consumer.getTopic()), consumer.getApp());
        if (i <= 0) {
            i = consumerPolicy.getBatchSize().shortValue();
        }
        if (i2 <= 0) {
            i2 = consumerPolicy.getAckTimeout().intValue();
        }
        if (this.partitionManager.needPause(consumer)) {
            PullResult pullResult = new PullResult(consumer, (short) -1, Collections.emptyList());
            pullResult.setCode(JoyQueueCode.FW_FETCH_TOPIC_MESSAGE_PAUSED);
            return pullResult;
        }
        long andIncrement = getAndIncrement(consumer);
        try {
            switch (choiceConsumeStrategy(consumerPolicy)) {
                case DEFAULT:
                    message = this.partitionConsumption.getMessage(consumer, i, i2, andIncrement);
                    break;
                case SEQUENCE:
                    message = this.partitionConsumption.getMessage4Sequence(consumer, getSequencePartition(consumer), i, i2);
                    break;
                case CONCURRENT:
                    message = this.concurrentConsumption.getMessage(consumer, i, i2, andIncrement, consumerPolicy.getConcurrent().intValue());
                    break;
                default:
                    throw new JoyQueueException(JoyQueueCode.CN_PARAM_ERROR, new Object[]{"invalid consume strategy"});
            }
            if (message.getBuffers().size() > 0) {
                PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(TopicName.parse(consumer.getTopic()), message.getPartition());
                monitor(message, now, consumer, partitionGroup == null ? -1 : partitionGroup.getGroup());
            }
            return message;
        } catch (JoyQueueException e) {
            throw e;
        }
    }

    private ConsumeStrategy choiceConsumeStrategy(Consumer.ConsumerPolicy consumerPolicy) {
        return consumerPolicy.getSeq().booleanValue() ? ConsumeStrategy.SEQUENCE : consumerPolicy.isConcurrent().booleanValue() ? ConsumeStrategy.CONCURRENT : ConsumeStrategy.DEFAULT;
    }

    @Override // org.joyqueue.broker.BrokerContextAware
    public void setBrokerContext(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
    }

    private short getSequencePartition(org.joyqueue.network.session.Consumer consumer) {
        return (short) 0;
    }

    private long getAndIncrement(org.joyqueue.network.session.Consumer consumer) {
        Joint joint = new Joint(consumer.getTopic(), consumer.getApp());
        AtomicLong atomicLong = this.consumeCounter.get(joint);
        if (atomicLong == null) {
            atomicLong = new AtomicLong(0L);
            this.consumeCounter.put(joint, atomicLong);
        }
        return atomicLong.getAndIncrement();
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public PullResult getMessage(org.joyqueue.network.session.Consumer consumer, short s, long j, int i) throws JoyQueueException {
        Preconditions.checkArgument(consumer != null, "消费者信息不能为空");
        Preconditions.checkArgument(s >= 0, "分区不能小于0");
        Preconditions.checkArgument(j >= 0, "消费序号不能小于0");
        Preconditions.checkArgument(i > 0, "消费条数不能小于或等于0");
        Integer valueOf = Integer.valueOf(this.partitionManager.getGroupByPartition(TopicName.parse(consumer.getTopic()), s));
        Preconditions.checkArgument(valueOf != null && valueOf.intValue() >= 0, "找不到主题[" + consumer.getTopic() + "],分区[" + ((int) s) + "]的分区组");
        try {
            long now = SystemClock.now();
            PullResult msgByPartitionAndIndex = this.partitionConsumption.getMsgByPartitionAndIndex(consumer, valueOf.intValue(), s, j, i);
            monitor(msgByPartitionAndIndex, now, consumer, valueOf.intValue());
            return msgByPartitionAndIndex;
        } catch (IOException e) {
            this.logger.warn(e.getMessage(), e);
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public PullResult getMessage(String str, short s, long j, int i) throws JoyQueueException {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "主题不能为空");
        Preconditions.checkArgument(s >= 0, "分区不能小于0");
        Preconditions.checkArgument(j >= 0, "消费序号不能小于0");
        Preconditions.checkArgument(i > 0, "消费条数不能小于或等于0");
        Integer valueOf = Integer.valueOf(this.partitionManager.getGroupByPartition(TopicName.parse(str), s));
        Preconditions.checkArgument(valueOf != null && valueOf.intValue() >= 0, "找不到主题[" + str + "],分区[" + ((int) s) + "]的分区组");
        try {
            return this.partitionConsumption.getMsgByPartitionAndIndex(str, valueOf.intValue(), s, j, i);
        } catch (Exception e) {
            this.logger.debug(e.getMessage(), e);
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    private void monitor(PullResult pullResult, long j, org.joyqueue.network.session.Consumer consumer, int i) {
        if (pullResult == null || !CollectionUtils.isNotEmpty(pullResult.getBuffers())) {
            return;
        }
        long now = SystemClock.now();
        int i2 = 0;
        int i3 = 0;
        for (ByteBuffer byteBuffer : pullResult.getBuffers()) {
            i3 += byteBuffer.limit();
            BrokerMessage readBrokerMessageHeader = Serializer.readBrokerMessageHeader(byteBuffer);
            i2 = readBrokerMessageHeader.isBatch() ? i2 + readBrokerMessageHeader.getFlag() : i2 + 1;
        }
        this.brokerMonitor.onGetMessage(consumer.getTopic(), consumer.getApp(), i, pullResult.getPartition(), i2, i3, now - j);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public boolean acknowledge(MessageLocation[] messageLocationArr, org.joyqueue.network.session.Consumer consumer, Connection connection, boolean z) throws JoyQueueException {
        boolean z2 = false;
        if (messageLocationArr.length <= 0) {
            return false;
        }
        ConsumePartition consumePartition = new ConsumePartition(consumer.getTopic(), consumer.getApp(), messageLocationArr[0].getPartition());
        ConsumePartition lockInstance = this.lockInstance.getLockInstance(consumePartition);
        Consumer.ConsumerPolicy consumerPolicy = this.clusterManager.getConsumerPolicy(TopicName.parse(consumer.getTopic()), consumer.getApp());
        switch (choiceConsumeStrategy(consumerPolicy)) {
            case DEFAULT:
            case SEQUENCE:
                synchronized (lockInstance) {
                    z2 = this.partitionConsumption.acknowledge(messageLocationArr, consumer, z);
                }
                break;
            case CONCURRENT:
                synchronized (lockInstance) {
                    z2 = this.concurrentConsumption.acknowledge(messageLocationArr, consumer, z);
                }
                break;
        }
        if (z2) {
            this.partitionManager.releasePartition(consumePartition);
            if (consumePartition.getPartition() != Short.MAX_VALUE) {
                Integer partitionGroupId = this.clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), consumePartition.getPartition());
                if (partitionGroupId == null) {
                    this.logger.error("onAckMessage error, partitionGroupId is null, topic: {}, app: {}, partition: {}", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(consumePartition.getPartition())});
                } else {
                    this.brokerMonitor.onAckMessage(consumer.getTopic(), consumer.getApp(), partitionGroupId.intValue(), consumePartition.getPartition());
                }
                archiveIfNecessary(consumerPolicy, connection, messageLocationArr);
            }
        }
        return z2;
    }

    public void archiveIfNecessary(Consumer.ConsumerPolicy consumerPolicy, Connection connection, MessageLocation[] messageLocationArr) {
        ConsumeArchiveService consumeArchiveService;
        try {
            if (consumerPolicy.getArchive() == null || !consumerPolicy.getArchive().booleanValue() || this.archiveManager == null || (consumeArchiveService = this.archiveManager.getConsumeArchiveService()) == null) {
                return;
            }
            consumeArchiveService.appendConsumeLog(connection, messageLocationArr);
        } catch (Throwable th) {
            this.logger.warn(String.format("archive message consume error,locations: %s ", messageLocationArr), th);
        }
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public boolean hasFreePartition(org.joyqueue.network.session.Consumer consumer) {
        return this.partitionManager.hasFreePartition(consumer);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public long getPullIndex(org.joyqueue.network.session.Consumer consumer, short s) {
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        Position position = this.positionManager.getPosition(TopicName.parse(consumer.getTopic()), consumer.getApp(), s);
        if (position == null) {
            return -1L;
        }
        return position.getPullCurIndex();
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public void setPullIndex(org.joyqueue.network.session.Consumer consumer, short s, long j) throws JoyQueueException {
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        this.positionManager.updateLastMsgPullIndex(TopicName.parse(topic), app, s, j);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public long getAckIndex(org.joyqueue.network.session.Consumer consumer, short s) {
        Position position;
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        if (this.clusterManager.getPartitionGroupId(TopicName.parse(topic), s) == null || (position = this.positionManager.getPosition(TopicName.parse(topic), app, s)) == null) {
            return -1L;
        }
        return position.getAckCurIndex();
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public long getStartIndex(org.joyqueue.network.session.Consumer consumer, short s) {
        Position position;
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        if (this.clusterManager.getPartitionGroupId(TopicName.parse(topic), s) == null || (position = this.positionManager.getPosition(TopicName.parse(topic), app, s)) == null) {
            return -1L;
        }
        return position.getAckStartIndex();
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public void setAckIndex(org.joyqueue.network.session.Consumer consumer, short s, long j) throws JoyQueueException {
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        Integer partitionGroupId = this.clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), s);
        Preconditions.checkArgument(partitionGroupId != null, "partitionGroupId can not be null.");
        this.positionManager.updateLastMsgAckIndex(TopicName.parse(topic), app, s, j);
        this.brokerMonitor.onAckMessage(consumer.getTopic(), consumer.getApp(), partitionGroupId.intValue(), s);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public void setStartAckIndex(org.joyqueue.network.session.Consumer consumer, short s, long j) throws JoyQueueException {
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        Preconditions.checkArgument(topic != null, "topic can not be null.");
        Preconditions.checkArgument(app != null, "app can not be null.");
        this.positionManager.updateStartMsgAckIndex(TopicName.parse(topic), app, s, j);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public boolean resetPullIndex(String str, String str2) throws JoyQueueException {
        List<Short> localPartitions = this.clusterManager.getLocalPartitions(TopicName.parse(str));
        int i = 0;
        Iterator<Short> it = localPartitions.iterator();
        while (it.hasNext()) {
            short shortValue = it.next().shortValue();
            if (this.positionManager.updateLastMsgPullIndex(TopicName.parse(str), str2, shortValue, this.positionManager.getLastMsgPullIndex(TopicName.parse(str), str2, shortValue))) {
                i++;
            }
        }
        return localPartitions.size() == i;
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public boolean setConsumePosition(Map<ConsumePartition, Position> map) {
        if (map == null || map.isEmpty()) {
            return false;
        }
        return this.positionManager.setConsumePosition(map);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public Map<ConsumePartition, Position> getConsumePositionByGroup(TopicName topicName, String str, int i) {
        if (CollectionUtils.isEmpty(this.clusterManager.getLocalPartitionGroups(topicName))) {
            return null;
        }
        return this.positionManager.getConsumePosition(topicName, str, i);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public Map<ConsumePartition, Position> getConsumePositionByGroup(TopicName topicName, int i) {
        if (CollectionUtils.isEmpty(this.clusterManager.getLocalPartitionGroups(topicName))) {
            return null;
        }
        return this.positionManager.getConsumePosition(topicName, i);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public long getMinIndex(org.joyqueue.network.session.Consumer consumer, short s) {
        String topic = consumer.getTopic();
        return this.storeService.getStore(topic, this.clusterManager.getPartitionGroupId(TopicName.parse(topic), s).intValue()).getLeftIndex(s);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public long getMaxIndex(org.joyqueue.network.session.Consumer consumer, short s) {
        String topic = consumer.getTopic();
        return this.storeService.getStore(topic, this.clusterManager.getPartitionGroupId(TopicName.parse(topic), s).intValue()).getRightIndex(s);
    }

    @Override // org.joyqueue.broker.consumer.Consume
    public void releasePartition(String str, String str2, short s) {
        this.partitionManager.releasePartition(new ConsumePartition(str, str2, s));
    }

    protected void doResetBroadcastIndex() {
        if (this.consumeConfig.getBroadcastIndexResetEnable()) {
            List<TopicConfig> topics = this.clusterManager.getTopics();
            if (CollectionUtils.isEmpty(topics)) {
                return;
            }
            for (TopicConfig topicConfig : topics) {
                ArrayList newArrayList = Lists.newArrayList(this.clusterManager.getNameService().getConsumerByTopic(topicConfig.getName()));
                Iterator<org.joyqueue.domain.Consumer> it = newArrayList.iterator();
                while (it.hasNext()) {
                    if (!it.next().getTopicType().equals(TopicType.BROADCAST)) {
                        it.remove();
                    }
                }
                if (!CollectionUtils.isEmpty(newArrayList)) {
                    Iterator<PartitionGroup> it2 = this.clusterManager.getLocalPartitionGroups(topicConfig).iterator();
                    while (it2.hasNext()) {
                        doResetBroadcastIndex(topicConfig, it2.next(), newArrayList);
                    }
                }
            }
        }
    }

    protected void doResetBroadcastIndex(TopicConfig topicConfig, PartitionGroup partitionGroup, List<org.joyqueue.domain.Consumer> list) {
        PartitionGroupStore store = this.storeService.getStore(topicConfig.getName().getFullName(), partitionGroup.getGroup());
        if (store == null) {
            this.logger.warn("reset broadcast index failed, store not exist, topic: {}, group: {}", topicConfig.getName(), Integer.valueOf(partitionGroup.getGroup()));
            return;
        }
        long now = SystemClock.now() - this.consumeConfig.getBroadcastIndexResetTime();
        for (Short sh : partitionGroup.getPartitions()) {
            long index = store.getIndex(sh.shortValue(), now);
            if (index >= 0) {
                for (org.joyqueue.domain.Consumer consumer : list) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("reset broadcast index, topic: {}, group: {}, partition: {}, index: {}, app: {}", new Object[]{topicConfig.getName(), Integer.valueOf(partitionGroup.getGroup()), sh, Long.valueOf(index), consumer.getApp()});
                    }
                    try {
                        setAckIndex(new org.joyqueue.network.session.Consumer(consumer.getTopic().getFullName(), consumer.getApp()), sh.shortValue(), index);
                    } catch (JoyQueueException e) {
                        this.logger.debug("reset broadcast index exception, topic: {}, group: {}, partition: {}, index: {}, app: {}", new Object[]{topicConfig.getName(), Integer.valueOf(partitionGroup.getGroup()), sh, Long.valueOf(index), consumer.getApp(), e});
                    }
                }
            }
        }
    }
}
