package org.joyqueue.broker.consumer;

import com.google.common.collect.Lists;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.archive.ArchiveManager;
import org.joyqueue.broker.archive.ConsumeArchiveService;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.filter.FilterCallback;
import org.joyqueue.broker.consumer.model.ConsumePartition;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.broker.consumer.position.model.Position;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.MessageLocation;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.store.PositionOverflowException;
import org.joyqueue.store.PositionUnderflowException;
import org.joyqueue.store.ReadResult;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/consumer/ConcurrentConsumption.class */
class ConcurrentConsumption extends Service {
    private PartitionManager partitionManager;
    private MessageRetry messageRetry;
    private PositionManager positionManager;
    private StoreService storeService;
    private ClusterManager clusterManager;
    private FilterMessageSupport filterMessageSupport;
    private LoopThread moveExpireThread;
    private LoopThread cleanExpireThread;
    private ArchiveManager archiveManager;
    private SessionManager sessionManager;
    private final Logger logger = LoggerFactory.getLogger(ConcurrentConsumption.class);
    private ConcurrentMap<ConsumePartition, Boolean> resetPullPositionFlag = new ConcurrentHashMap();
    private ConcurrentMap<PartitionSegment, Long> segmentConsumeMap = new ConcurrentHashMap();
    private ConcurrentMap<ConsumePartition, ConcurrentLinkedQueue<PartitionSegment>> expireQueueMap = new ConcurrentHashMap(1000);
    private ConcurrentMap<ConsumePartition, AtomicInteger> consumerSegmentNumMap = new ConcurrentHashMap();
    private ConcurrentMap<ConsumePartition, List<Position>> concurrentConsumeCache = new ConcurrentHashMap();
    private PartitionLockInstance lockInstance = new PartitionLockInstance();
    private DelayHandler delayHandler = new DelayHandler();
    private final String innerAppPrefix = "innerFilter@";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/consumer/ConcurrentConsumption$FilterCallbackImpl.class */
    public class FilterCallbackImpl implements FilterCallback {
        private Consumer consumer;

        FilterCallbackImpl(Consumer consumer) {
            this.consumer = consumer;
        }

        @Override // org.joyqueue.broker.consumer.filter.FilterCallback
        public void callback(List<ByteBuffer> list) throws JoyQueueException {
            ConcurrentConsumption.this.innerAcknowledge(this.consumer, list);
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/consumer/ConcurrentConsumption$MovePartitionSegmentListener.class */
    class MovePartitionSegmentListener implements EventListener<SessionManager.SessionEvent> {
        MovePartitionSegmentListener() {
        }

        public void onEvent(SessionManager.SessionEvent sessionEvent) {
        }

        private void movePartitionSegmentToExpire(Consumer consumer) {
            Iterator it = ConcurrentConsumption.this.segmentConsumeMap.keySet().iterator();
            while (it.hasNext()) {
                PartitionSegment partitionSegment = (PartitionSegment) it.next();
                if (partitionSegment != null && StringUtils.equals(partitionSegment.getTopic(), consumer.getTopic()) && StringUtils.equals(partitionSegment.getApp(), consumer.getApp())) {
                    ConcurrentConsumption.this.addToExpireQueue(new ConsumePartition(partitionSegment.getTopic(), partitionSegment.getApp(), partitionSegment.getPartition()), partitionSegment);
                    it.remove();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/broker/consumer/ConcurrentConsumption$PartitionSegment.class */
    public class PartitionSegment {
        private String topic;
        private String app;
        private short partition;
        private long startIndex;
        private long endIndex;

        PartitionSegment(String str, String str2, short s, long j, long j2) {
            this.topic = str;
            this.app = str2;
            this.partition = s;
            this.startIndex = j;
            this.endIndex = j2;
        }

        public String getTopic() {
            return this.topic;
        }

        public void setTopic(String str) {
            this.topic = str;
        }

        public String getApp() {
            return this.app;
        }

        public void setApp(String str) {
            this.app = str;
        }

        public short getPartition() {
            return this.partition;
        }

        public void setPartition(short s) {
            this.partition = s;
        }

        public long getStartIndex() {
            return this.startIndex;
        }

        public void setStartIndex(long j) {
            this.startIndex = j;
        }

        public long getEndIndex() {
            return this.endIndex;
        }

        public void setEndIndex(long j) {
            this.endIndex = j;
        }

        public int hashCode() {
            return (31 * ((31 * ((31 * ((31 * this.topic.hashCode()) + this.app.hashCode())) + Short.hashCode(this.partition))) + Long.hashCode(this.startIndex))) + Long.hashCode(this.endIndex);
        }

        public boolean equals(Object obj) {
            PartitionSegment partitionSegment = (PartitionSegment) obj;
            return StringUtils.equals(this.topic, partitionSegment.topic) && StringUtils.equals(this.app, partitionSegment.app) && this.partition == partitionSegment.partition && this.startIndex == partitionSegment.startIndex && this.endIndex == partitionSegment.endIndex;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("PartitionSegment{");
            sb.append("topic='").append(this.topic).append('\'');
            sb.append(", app=").append(this.app);
            sb.append(", partition=").append((int) this.partition);
            sb.append(", startIndex=").append(this.startIndex);
            sb.append(", endIndex=").append(this.endIndex);
            sb.append('}');
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentConsumption(ClusterManager clusterManager, StoreService storeService, PartitionManager partitionManager, MessageRetry messageRetry, PositionManager positionManager, FilterMessageSupport filterMessageSupport, ArchiveManager archiveManager, SessionManager sessionManager) {
        this.clusterManager = clusterManager;
        this.storeService = storeService;
        this.partitionManager = partitionManager;
        this.messageRetry = messageRetry;
        this.positionManager = positionManager;
        this.filterMessageSupport = filterMessageSupport;
        this.archiveManager = archiveManager;
        this.sessionManager = sessionManager;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.moveExpireThread = LoopThread.builder().sleepTime(5000L, 5000L).name("JournalQ-concurrent-consumption-move-expire-Thread").onException(th -> {
            this.logger.warn("Exception:", th);
        }).doWork(() -> {
            moveSegment2ExpireQueue();
        }).build();
        this.moveExpireThread.start();
        this.cleanExpireThread = LoopThread.builder().sleepTime(5000L, 5000L).name("JournalQ-concurrent-consumption-clean-expire-Thread").onException(th2 -> {
            this.logger.warn("Exception:", th2);
        }).doWork(() -> {
            cleanExpireQueue();
        }).build();
        this.cleanExpireThread.start();
        this.logger.info("ConcurrentConsumption is started.");
        this.sessionManager.addListener(new MovePartitionSegmentListener());
    }

    protected void doStop() {
        super.doStop();
        Close.close(this.moveExpireThread);
        Close.close(this.cleanExpireThread);
        this.logger.info("ConcurrentConsumption is stopped.");
    }

    public PullResult getMessage(Consumer consumer, int i, long j, long j2, int i2) throws JoyQueueException {
        List<Short> masterPartitionList = this.clusterManager.getMasterPartitionList(TopicName.parse(consumer.getTopic()));
        PartitionSegment pollPartitionSegment = pollPartitionSegment(consumer, masterPartitionList);
        PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList(0));
        if (pollPartitionSegment != null) {
            pullResult = getFromExpireAckQueue(consumer, pollPartitionSegment, j2);
        } else if (this.partitionManager.isRetry(consumer)) {
            List<ByteBuffer> convert = convert(this.messageRetry.getRetry(consumer.getTopic(), consumer.getApp(), (short) i, 0L));
            pullResult.setBuffers(convert);
            if (convert.size() > 0) {
                this.partitionManager.increaseRetryProbability(consumer);
            } else {
                this.partitionManager.decreaseRetryProbability(consumer);
            }
        } else {
            List<Short> priorityPartition = this.partitionManager.getPriorityPartition(TopicName.parse(consumer.getTopic()));
            pullResult = priorityPartition.size() > 0 ? getFromPartition(consumer, priorityPartition, i, j, j2, i2) : getFromPartition(consumer, masterPartitionList, i, j, j2, i2);
        }
        return pullResult;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerAcknowledge(Consumer consumer, List<ByteBuffer> list) throws JoyQueueException {
        if (list == null) {
            return;
        }
        MessageLocation[] convertMessageLocation = convertMessageLocation(consumer.getTopic(), list);
        acknowledge(convertMessageLocation, consumer, true);
        archiveIfnecessary(convertMessageLocation);
    }

    private void archiveIfnecessary(MessageLocation[] messageLocationArr) throws JoyQueueException {
        ConsumeArchiveService consumeArchiveService;
        if (this.archiveManager == null || (consumeArchiveService = this.archiveManager.getConsumeArchiveService()) == null) {
            return;
        }
        Connection connection = new Connection();
        try {
            connection.setAddress(IpUtil.toByte(new InetSocketAddress(IpUtil.getLocalIp(), 50088)));
        } catch (Exception e) {
            connection.setAddress(new byte[0]);
        }
        connection.setApp("innerFilter@" + connection.getApp());
        consumeArchiveService.appendConsumeLog(connection, messageLocationArr);
    }

    private MessageLocation[] convertMessageLocation(String str, List<ByteBuffer> list) {
        MessageLocation[] messageLocationArr = new MessageLocation[list.size()];
        for (int i = 0; i < list.size(); i++) {
            ByteBuffer byteBuffer = list.get(i);
            messageLocationArr[i] = new MessageLocation(str, Serializer.readPartition(byteBuffer), Serializer.readIndex(byteBuffer));
        }
        return messageLocationArr;
    }

    private List<ByteBuffer> convert(List<RetryMessageModel> list) throws JoyQueueException {
        if (CollectionUtils.isEmpty(list)) {
            return new ArrayList(0);
        }
        ArrayList arrayList = new ArrayList(list.size());
        for (RetryMessageModel retryMessageModel : list) {
            try {
                ByteBuffer wrap = ByteBuffer.wrap(retryMessageModel.getBrokerMessage());
                Serializer.setPartition(wrap, Short.MAX_VALUE);
                Serializer.setIndex(wrap, retryMessageModel.getIndex());
                arrayList.add(wrap);
            } catch (Exception e) {
                throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
            }
        }
        return arrayList;
    }

    private PullResult getFromExpireAckQueue(Consumer consumer, PartitionSegment partitionSegment, long j) throws JoyQueueException {
        PullResult readMessages;
        short partition = partitionSegment.getPartition();
        synchronized (this.lockInstance.getLockInstance(consumer.getTopic(), consumer.getApp(), partition)) {
            int endIndex = ((int) (partitionSegment.getEndIndex() - partitionSegment.getStartIndex())) + 1;
            long startIndex = partitionSegment.getStartIndex();
            readMessages = readMessages(consumer, partition, startIndex, endIndex);
            int size = readMessages.getBuffers().size();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < size; i++) {
                arrayList.add(Long.valueOf(startIndex + i));
            }
            this.logger.info("readExpireMessages, partition: {}, index: {}, size: {}", new Object[]{Short.valueOf(partition), arrayList, Integer.valueOf(this.expireQueueMap.get(new ConsumePartition(partitionSegment.getTopic(), partitionSegment.getApp(), partitionSegment.getPartition())).size())});
            if (size > 0) {
                trackConsumeDetail(consumer, partition, partitionSegment.getStartIndex(), partitionSegment.getEndIndex(), j, false);
            }
        }
        return readMessages;
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x00c0, code lost:
    
        r0 = new java.util.ArrayList();
        r30 = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00d0, code lost:
    
        if (r30 >= r0) goto L38;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x00d3, code lost:
    
        r0.add(java.lang.Long.valueOf(r0 + r30));
        r30 = r30 + 1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x00ea, code lost:
    
        r0 = r0 + r0;
        r11.logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", new java.lang.Object[]{java.lang.Long.valueOf(r0), r12.getTopic(), r12.getApp(), java.lang.Short.valueOf(r0)});
        r11.positionManager.updateLastMsgPullIndex(org.joyqueue.domain.TopicName.parse(r12.getTopic()), r12.getApp(), r0, r0);
        trackConsumeDetail(r12, r0, r0, r0 - 1, r15, true);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.joyqueue.broker.consumer.model.PullResult getFromPartition(org.joyqueue.network.session.Consumer r12, java.util.List<java.lang.Short> r13, int r14, long r15, long r17, int r19) throws org.joyqueue.exception.JoyQueueException {
        /*
            Method dump skipped, instructions count: 359
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.joyqueue.broker.consumer.ConcurrentConsumption.getFromPartition(org.joyqueue.network.session.Consumer, java.util.List, int, long, long, int):org.joyqueue.broker.consumer.model.PullResult");
    }

    private int count(PullResult pullResult) {
        int i = 0;
        Iterator<ByteBuffer> it = pullResult.getBuffers().iterator();
        while (it.hasNext()) {
            BrokerMessage readBrokerMessageHeader = Serializer.readBrokerMessageHeader(it.next());
            i = readBrokerMessageHeader.isBatch() ? i + readBrokerMessageHeader.getFlag() : i + 1;
        }
        return i;
    }

    private boolean isExceedConcurrent(Consumer consumer, short s, int i) {
        AtomicInteger atomicInteger = this.consumerSegmentNumMap.get(new ConsumePartition(consumer.getTopic(), consumer.getApp(), s));
        return atomicInteger != null && atomicInteger.get() >= i;
    }

    private void releaseConcurrentCounter(ConsumePartition consumePartition) {
        AtomicInteger atomicInteger = this.consumerSegmentNumMap.get(consumePartition);
        if (atomicInteger != null) {
            atomicInteger.decrementAndGet();
        }
    }

    private int increaseConcurrentCounter(ConsumePartition consumePartition) {
        AtomicInteger atomicInteger = this.consumerSegmentNumMap.get(consumePartition);
        if (atomicInteger == null) {
            atomicInteger = new AtomicInteger(0);
            AtomicInteger putIfAbsent = this.consumerSegmentNumMap.putIfAbsent(consumePartition, atomicInteger);
            if (putIfAbsent != null) {
                atomicInteger = putIfAbsent;
            }
        }
        return atomicInteger.incrementAndGet();
    }

    private PartitionSegment pollPartitionSegment(Consumer consumer, List<Short> list) {
        Iterator<Short> it = list.iterator();
        while (it.hasNext()) {
            ConcurrentLinkedQueue<PartitionSegment> concurrentLinkedQueue = this.expireQueueMap.get(new ConsumePartition(consumer.getTopic(), consumer.getApp(), it.next().shortValue()));
            if (concurrentLinkedQueue != null && !concurrentLinkedQueue.isEmpty()) {
                return concurrentLinkedQueue.poll();
            }
        }
        return null;
    }

    private long getPullIndex(Consumer consumer, short s) throws JoyQueueException {
        long j = 0;
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        ConsumePartition consumePartition = new ConsumePartition(topic, app, s);
        TopicName parse = TopicName.parse(topic);
        Boolean bool = this.resetPullPositionFlag.get(consumePartition);
        if (bool == null || !bool.booleanValue()) {
            long lastMsgAckIndex = this.positionManager.getLastMsgAckIndex(parse, app, s);
            if (this.positionManager.updateLastMsgPullIndex(parse, app, s, lastMsgAckIndex)) {
                this.resetPullPositionFlag.put(consumePartition, Boolean.TRUE);
                j = lastMsgAckIndex;
            }
            this.logger.info("init concurrent pull index [{}]", Long.valueOf(j));
        } else {
            j = this.positionManager.getLastMsgPullIndex(parse, app, s);
            if (j == -1) {
                this.concurrentConsumeCache.remove(consumePartition);
                this.positionManager.updateLastMsgPullIndex(parse, app, s, this.positionManager.getLastMsgAckIndex(parse, app, s));
            }
        }
        return j;
    }

    private PullResult readMessages(Consumer consumer, short s, long j, int i) throws JoyQueueException {
        PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList(0));
        try {
            ReadResult read = this.storeService.getStore(consumer.getTopic(), this.clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), s).intValue()).read(s, j, i, Long.MAX_VALUE);
            if (read.getCode() == JoyQueueCode.SUCCESS) {
                ArrayList newArrayList = Lists.newArrayList(read.getMessages());
                org.joyqueue.domain.Consumer consumer2 = this.clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());
                if (consumer2 != null) {
                    pullResult = new PullResult(consumer, s, this.delayHandler.handle(consumer2.getConsumerPolicy(), this.filterMessageSupport.filter(consumer2, newArrayList, new FilterCallbackImpl(consumer))));
                }
            } else {
                this.logger.error("read message error, error code[{}]", read.getCode());
            }
        } catch (Exception e) {
            this.logger.error("get message error, consumer: {}, partition: {}", new Object[]{consumer, Short.valueOf(s), e});
        } catch (PositionOverflowException e2) {
            if (e2.getRight() < j) {
                pullResult.setCode(JoyQueueCode.SE_INDEX_OVERFLOW);
            }
        } catch (PositionUnderflowException e3) {
            pullResult.setCode(JoyQueueCode.SE_INDEX_UNDERFLOW);
        }
        return pullResult;
    }

    private void trackConsumeDetail(Consumer consumer, short s, long j, long j2, long j3, boolean z) {
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        this.segmentConsumeMap.put(new PartitionSegment(topic, app, s, j, j2), Long.valueOf(j3 + SystemClock.now()));
        if (z) {
            increaseConcurrentCounter(new ConsumePartition(topic, app, s));
            List<Position> list = this.concurrentConsumeCache.get(new ConsumePartition(topic, app, s));
            if (list == null) {
                list = new ArrayList();
                List<Position> putIfAbsent = this.concurrentConsumeCache.putIfAbsent(new ConsumePartition(topic, app, s), list);
                if (putIfAbsent != null) {
                    list = putIfAbsent;
                }
            }
            list.add(new Position(j, j2, -1L, -1L));
        }
    }

    public boolean acknowledge(MessageLocation[] messageLocationArr, Consumer consumer, boolean z) throws JoyQueueException {
        boolean z2 = false;
        if (messageLocationArr.length < 1) {
            return false;
        }
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        short partition = messageLocationArr[0].getPartition();
        if (partition == Short.MAX_VALUE) {
            return retryAck(topic, app, messageLocationArr, z);
        }
        long[] jArr = new long[messageLocationArr.length];
        for (int i = 0; i < messageLocationArr.length; i++) {
            jArr[i] = messageLocationArr[i].getIndex();
        }
        this.logger.debug("pre ack, partition: {}, index: {}", Short.valueOf(partition), jArr);
        long[] sortMsgLocation = AcknowledgeSupport.sortMsgLocation(messageLocationArr);
        if (sortMsgLocation != null) {
            PartitionSegment partitionSegment = new PartitionSegment(topic, app, partition, sortMsgLocation[0], sortMsgLocation[1]);
            ConsumePartition consumePartition = new ConsumePartition(topic, app, partition);
            synchronized (this.lockInstance.getLockInstance(consumePartition)) {
                tryUpdateAckPosition(consumePartition, sortMsgLocation);
                this.segmentConsumeMap.remove(partitionSegment);
                removeFromExpireQueue(consumePartition, partitionSegment);
                z2 = true;
                releaseConcurrentCounter(consumePartition);
            }
        }
        return z2;
    }

    private boolean retryAck(String str, String str2, MessageLocation[] messageLocationArr, boolean z) {
        Long[] lArr = new Long[messageLocationArr.length];
        for (int i = 0; i < messageLocationArr.length; i++) {
            lArr[i] = Long.valueOf(messageLocationArr[i].getIndex());
        }
        try {
            if (z) {
                this.messageRetry.retrySuccess(str, str2, lArr);
            } else {
                this.messageRetry.retryError(str, str2, lArr);
            }
            return true;
        } catch (JoyQueueException e) {
            this.logger.error("RetryAck error.", e);
            return false;
        }
    }

    private boolean isExpireQueueContains(ConsumePartition consumePartition, PartitionSegment partitionSegment) {
        boolean z = false;
        ConcurrentLinkedQueue<PartitionSegment> concurrentLinkedQueue = this.expireQueueMap.get(consumePartition);
        if (concurrentLinkedQueue != null && partitionSegment != null) {
            z = concurrentLinkedQueue.contains(partitionSegment);
        }
        return z;
    }

    private void removeFromExpireQueue(ConsumePartition consumePartition, PartitionSegment partitionSegment) {
        ConcurrentLinkedQueue<PartitionSegment> concurrentLinkedQueue = this.expireQueueMap.get(consumePartition);
        if (concurrentLinkedQueue == null || partitionSegment == null) {
            return;
        }
        concurrentLinkedQueue.remove(partitionSegment);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addToExpireQueue(ConsumePartition consumePartition, PartitionSegment partitionSegment) {
        ConcurrentLinkedQueue<PartitionSegment> concurrentLinkedQueue = this.expireQueueMap.get(consumePartition);
        if (concurrentLinkedQueue == null) {
            concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            ConcurrentLinkedQueue<PartitionSegment> putIfAbsent = this.expireQueueMap.putIfAbsent(consumePartition, concurrentLinkedQueue);
            if (putIfAbsent != null) {
                concurrentLinkedQueue = putIfAbsent;
            }
        }
        if (concurrentLinkedQueue.contains(partitionSegment)) {
            return;
        }
        concurrentLinkedQueue.offer(partitionSegment);
        long size = concurrentLinkedQueue.size();
        this.logger.debug("add expire queue, partition: {}, size: {}, start: {}, end: {}", new Object[]{Short.valueOf(partitionSegment.getPartition()), Long.valueOf(size), Long.valueOf(partitionSegment.getStartIndex()), Long.valueOf(partitionSegment.getEndIndex())});
        this.logger.debug("expire queue size is:[{}], partitionInfo:[{}], ", Long.valueOf(size), consumePartition);
        if (concurrentLinkedQueue.size() > 10000) {
            this.logger.info("expire queue size is:[{}], partitionInfo:[{}], ", Long.valueOf(size), consumePartition);
        }
    }

    private void moveSegment2ExpireQueue() {
        Iterator<Map.Entry<PartitionSegment, Long>> it = this.segmentConsumeMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<PartitionSegment, Long> next = it.next();
            PartitionSegment key = next.getKey();
            ConsumePartition consumePartition = new ConsumePartition(key.getTopic(), key.getApp(), key.getPartition());
            synchronized (this.lockInstance.getLockInstance(consumePartition)) {
                if (this.segmentConsumeMap.containsKey(key)) {
                    if (next.getValue().longValue() >= SystemClock.now()) {
                        addToExpireQueue(consumePartition, key);
                        it.remove();
                    }
                }
            }
        }
    }

    private void cleanExpireQueue() {
        ConcurrentMap<ConsumePartition, ConcurrentLinkedQueue<PartitionSegment>> concurrentMap = this.expireQueueMap;
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<ConsumePartition, ConcurrentLinkedQueue<PartitionSegment>>> it = concurrentMap.entrySet().iterator();
        while (it.hasNext()) {
            ConsumePartition key = it.next().getKey();
            try {
                if (!this.clusterManager.getConsumerPolicy(TopicName.parse(key.getTopic()), key.getApp()).isConcurrent().booleanValue()) {
                    arrayList.add(key);
                }
            } catch (Exception e) {
                this.logger.warn("clean expire error", e.getMessage());
            }
        }
        ConcurrentMap<ConsumePartition, AtomicInteger> concurrentMap2 = this.consumerSegmentNumMap;
        ConcurrentMap<ConsumePartition, List<Position>> concurrentMap3 = this.concurrentConsumeCache;
        arrayList.forEach(consumePartition -> {
            concurrentMap.remove(consumePartition);
            concurrentMap2.remove(consumePartition);
            concurrentMap3.remove(consumePartition);
        });
    }

    private void tryUpdateAckPosition(ConsumePartition consumePartition, long[] jArr) throws JoyQueueException {
        String topic = consumePartition.getTopic();
        String app = consumePartition.getApp();
        short partition = consumePartition.getPartition();
        long lastMsgAckIndex = this.positionManager.getLastMsgAckIndex(TopicName.parse(topic), app, partition);
        List<Position> sortPositions = sortPositions(consumePartition);
        Position position = null;
        if (CollectionUtils.isEmpty(sortPositions)) {
            this.logger.warn("current position is null, positions is empty, partition: {}, startIndex: {}, endIndex: {}", new Object[]{Short.valueOf(partition), Long.valueOf(jArr[0]), Long.valueOf(jArr[1])});
            return;
        }
        Iterator<Position> it = sortPositions.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Position next = it.next();
            if (next.getAckStartIndex() == jArr[0] && next.getAckCurIndex() == jArr[1]) {
                position = next;
                break;
            }
        }
        if (position == null) {
            this.logger.warn("current position is null, partition: {}, startIndex: {}, endIndex: {}", new Object[]{Short.valueOf(partition), Long.valueOf(jArr[0]), Long.valueOf(jArr[1])});
            return;
        }
        if (position.isAck()) {
            return;
        }
        Position position2 = sortPositions.get(0);
        position.setAck(true);
        if (!position2.isAck() || position2.getAckStartIndex() != lastMsgAckIndex) {
            this.logger.debug("commit index failed, partition: {}, head: {}, index: {}, ack: {}", new Object[]{Short.valueOf(partition), Long.valueOf(position2.getAckStartIndex()), Long.valueOf(lastMsgAckIndex), Boolean.valueOf(position2.isAck())});
            return;
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (int i = 0; i < sortPositions.size(); i++) {
            Position position3 = sortPositions.get(i);
            if (!position3.isAck() || position3.getAckStartIndex() != lastMsgAckIndex) {
                break;
            }
            lastMsgAckIndex = position3.getAckCurIndex() + 1;
            newLinkedList.add(position3);
        }
        if (newLinkedList.isEmpty()) {
            this.logger.info("commit index failed, partition: {}, head: {}, index: {}", new Object[]{Short.valueOf(partition), Long.valueOf(position2.getAckStartIndex()), Long.valueOf(lastMsgAckIndex)});
            return;
        }
        sortPositions.removeAll(newLinkedList);
        this.concurrentConsumeCache.put(consumePartition, sortPositions);
        this.positionManager.updateLastMsgAckIndex(TopicName.parse(topic), app, partition, lastMsgAckIndex, false);
        this.logger.debug("commit index, partition: {}, index: {}", Short.valueOf(partition), Long.valueOf(lastMsgAckIndex));
    }

    private void addAckSegment(ConsumePartition consumePartition, long j, long j2) {
        List<Position> list = this.concurrentConsumeCache.get(consumePartition);
        if (list == null) {
            list = new ArrayList();
            this.concurrentConsumeCache.put(consumePartition, list);
        }
        Position position = new Position(j, j2, -1L, -1L);
        if (list.contains(position)) {
            return;
        }
        list.add(position);
    }

    private List<Position> sortPositions(ConsumePartition consumePartition) {
        List<Position> list = this.concurrentConsumeCache.get(consumePartition);
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        List<Position> sortByAckStartIndex = sortByAckStartIndex(list);
        this.concurrentConsumeCache.put(consumePartition, sortByAckStartIndex);
        return sortByAckStartIndex;
    }

    private List<Position> sortByAckStartIndex(List<Position> list) {
        return (List) Lists.newArrayList(list).stream().sorted((position, position2) -> {
            return Long.compare(position.getAckStartIndex(), position2.getAckStartIndex());
        }).collect(Collectors.toList());
    }

    private List<Position> mergeSequenceSegment(List<Position> list) {
        int i;
        if (list.size() <= 1) {
            return list;
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < list.size(); i2 = i2 + i + 1) {
            Position position = list.get(i2);
            i = 0;
            for (int i3 = i2 + 1; i3 < list.size(); i3++) {
                Position tryMergeTwoSegment = tryMergeTwoSegment(position, list.get(i3));
                if (tryMergeTwoSegment != null) {
                    position = tryMergeTwoSegment;
                    i++;
                }
            }
            arrayList.add(position);
        }
        return arrayList;
    }

    private Position tryMergeTwoSegment(Position position, Position position2) {
        if (position.getAckCurIndex() >= position2.getAckCurIndex()) {
            return position;
        }
        if (position.getAckCurIndex() + 1 == position2.getAckStartIndex()) {
            return new Position(position.getAckStartIndex(), position2.getAckCurIndex(), -1L, -1L);
        }
        return null;
    }
}
