package org.joyqueue.broker.consumer;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.archive.ArchiveManager;
import org.joyqueue.broker.archive.ConsumeArchiveService;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.filter.FilterCallback;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.MessageLocation;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.PositionOverflowException;
import org.joyqueue.store.PositionUnderflowException;
import org.joyqueue.store.ReadResult;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/consumer/PartitionConsumption.class */
class PartitionConsumption extends Service {
    private PartitionManager partitionManager;
    private StoreService storeService;
    private ClusterManager clusterManager;
    private PositionManager positionManager;
    private FilterMessageSupport filterMessageSupport;
    private MessageRetry messageRetry;
    private ArchiveManager archiveManager;
    private ConsumeConfig config;
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumption.class);
    private DelayHandler delayHandler = new DelayHandler();
    private String monitorKey = "Read-Message";
    private final String innerAppPrefix = "innerFilter@";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/consumer/PartitionConsumption$FilterCallbackImpl.class */
    public class FilterCallbackImpl implements FilterCallback {
        private Consumer consumer;

        FilterCallbackImpl(Consumer consumer) {
            this.consumer = consumer;
        }

        @Override // org.joyqueue.broker.consumer.filter.FilterCallback
        public void callback(List<ByteBuffer> list) throws JoyQueueException {
            PartitionConsumption.this.innerAcknowledge(this.consumer, list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionConsumption(ClusterManager clusterManager, StoreService storeService, PartitionManager partitionManager, PositionManager positionManager, MessageRetry messageRetry, FilterMessageSupport filterMessageSupport, ArchiveManager archiveManager, ConsumeConfig consumeConfig) {
        this.clusterManager = clusterManager;
        this.storeService = storeService;
        this.partitionManager = partitionManager;
        this.positionManager = positionManager;
        this.messageRetry = messageRetry;
        this.filterMessageSupport = filterMessageSupport;
        this.archiveManager = archiveManager;
        this.config = consumeConfig;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.logger.info("PartitionConsumption is started.");
    }

    protected void doStop() {
        super.doStop();
        this.logger.info("PartitionConsumption is stopped.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PullResult getMessage(Consumer consumer, int i, long j, long j2) throws JoyQueueException {
        this.logger.debug("getMessage by topic:[{}], app:[{}], count:[{}], ackTimeout:[{}]", new Object[]{consumer.getTopic(), consumer.getApp(), Integer.valueOf(i), Long.valueOf(j)});
        PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList(0));
        List<Short> priorityPartition = this.partitionManager.getPriorityPartition(TopicName.parse(consumer.getTopic()));
        if (priorityPartition.size() > 0) {
            pullResult = getFromPartition(consumer, priorityPartition, i, j, j2);
        }
        if (pullResult.count() < 1) {
            List<Short> localPartitions = this.clusterManager.getLocalPartitions(TopicName.parse(consumer.getTopic()));
            if (this.partitionManager.isRetry(consumer)) {
                localPartitions = new ArrayList(localPartitions);
                localPartitions.add(Short.MAX_VALUE);
            }
            pullResult = getFromPartition(consumer, localPartitions, i, j, j2);
        }
        return pullResult;
    }

    private PullResult brokerMessage2PullResult(Consumer consumer, List<RetryMessageModel> list) throws JoyQueueException {
        ArrayList arrayList = new ArrayList(list.size());
        for (RetryMessageModel retryMessageModel : list) {
            ByteBuffer wrap = ByteBuffer.wrap(retryMessageModel.getBrokerMessage());
            Serializer.setPartition(wrap, Short.MAX_VALUE);
            Serializer.setIndex(wrap, retryMessageModel.getIndex());
            arrayList.add(wrap);
        }
        return new PullResult(consumer, Short.MAX_VALUE, arrayList);
    }

    private PullResult getFromPartition(Consumer consumer, List<Short> list, int i, long j, long j2) throws JoyQueueException {
        int size = list.size();
        int i2 = -1;
        int partitionSelectRetryMax = this.config.getPartitionSelectRetryMax();
        int partitionSelectRetryInterval = this.config.getPartitionSelectRetryInterval();
        for (int i3 = 0; i3 < size; i3++) {
            int selectPartitionIndex = this.partitionManager.selectPartitionIndex(size, i2, j2);
            short shortValue = list.get(selectPartitionIndex).shortValue();
            PullResult message4Sequence = getMessage4Sequence(consumer, shortValue, i, j);
            int size2 = message4Sequence.getBuffers().size();
            if (size2 > 0) {
                if (this.config.getLogDetail(consumer.getApp())) {
                    this.logger.info("getFromPartition, topic: {}, app: {}, count: {}, partition: {}, partitions: {}, result: {}", new Object[]{consumer.getTopic(), consumer.getApp(), Integer.valueOf(i), Short.valueOf(shortValue), list, Integer.valueOf(size2)});
                }
                return message4Sequence;
            }
            if (i3 != 0 && i3 % partitionSelectRetryMax == 0) {
                try {
                    Thread.currentThread();
                    Thread.sleep(partitionSelectRetryInterval);
                } catch (InterruptedException e) {
                }
            }
            i2 = selectPartitionIndex + 1;
        }
        return new PullResult(consumer, (short) -1, new ArrayList(0));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PullResult getMsgByPartitionAndIndex(Consumer consumer, int i, short s, long j, int i2) throws JoyQueueException, IOException {
        PullResult msgByPartitionAndIndex;
        org.joyqueue.domain.Consumer tryGetConsumer;
        PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList(0));
        try {
            msgByPartitionAndIndex = getMsgByPartitionAndIndex(consumer.getTopic(), i, s, j, i2);
        } catch (PositionOverflowException e) {
            this.logger.debug("PositionOverflow,topic:{},partition:{},index:{}", new Object[]{consumer.getTopic(), Short.valueOf(s), Long.valueOf(j)});
            if (e.getPosition() != e.getRight()) {
                pullResult.setCode(JoyQueueCode.SE_INDEX_OVERFLOW);
            }
        } catch (PositionUnderflowException e2) {
            this.logger.debug("PositionUnderflow,topic:{},partition:{},index:{}", new Object[]{consumer.getTopic(), Short.valueOf(s), Long.valueOf(j)});
            pullResult.setCode(JoyQueueCode.SE_INDEX_UNDERFLOW);
        }
        if (msgByPartitionAndIndex.getBuffers() == null) {
            return pullResult;
        }
        List<ByteBuffer> buffers = msgByPartitionAndIndex.getBuffers();
        if (StringUtils.isNotEmpty(consumer.getApp()) && !Consumer.ConsumeType.INTERNAL.equals(consumer.getType()) && !Consumer.ConsumeType.KAFKA.equals(consumer.getType()) && (tryGetConsumer = this.clusterManager.tryGetConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp())) != null) {
            buffers = this.delayHandler.handle(tryGetConsumer.getConsumerPolicy(), this.filterMessageSupport.filter(tryGetConsumer, buffers, new FilterCallbackImpl(consumer)));
        }
        pullResult = new PullResult(consumer, s, buffers);
        return pullResult;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PullResult getMsgByPartitionAndIndex(String str, int i, short s, long j, int i2) throws JoyQueueException, IOException {
        System.nanoTime();
        PullResult pullResult = new PullResult(str, null, s, null);
        ReadResult read = this.storeService.getStore(str, i).read(s, j, i2, Long.MAX_VALUE);
        if (read.getCode() == JoyQueueCode.SUCCESS) {
            pullResult.setBuffers(Lists.newArrayList(read.getMessages()));
            return pullResult;
        }
        this.logger.error("read message error, error code[{}]", read.getCode());
        pullResult.setCode(read.getCode());
        return pullResult;
    }

    protected PullResult getMsgByPartitionAndIndex(Consumer consumer, short s, long j, int i) throws IOException, JoyQueueException {
        Integer valueOf = Integer.valueOf(this.partitionManager.getGroupByPartition(TopicName.parse(consumer.getTopic()), s));
        Preconditions.checkArgument(valueOf != null && valueOf.intValue() >= 0, "找不到主题[" + consumer.getTopic() + "],分区[" + ((int) s) + "]的分区组");
        return getMsgByPartitionAndIndex(consumer, valueOf.intValue(), s, j, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PullResult getMessage4Sequence(Consumer consumer, short s, int i, long j) throws JoyQueueException {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("try getMessage4Sequence by topic:[{}], app:[{}], partition:[{}], count:[{}], ackTimeout:[{}]", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(s), Integer.valueOf(i), Long.valueOf(j)});
        }
        PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList(0));
        if (this.partitionManager.tryOccupyPartition(consumer, s, j)) {
            if (s == Short.MAX_VALUE) {
                PullResult retryMessage = getRetryMessage(consumer, pullResult);
                if (retryMessage.count() < 1) {
                    this.partitionManager.releasePartition(consumer, s);
                }
                return retryMessage;
            }
            int intValue = this.clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), s).intValue();
            long lastMsgAckIndex = this.positionManager.getLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), s);
            try {
                ByteBuffer[] readMessages = readMessages(consumer, intValue, s, lastMsgAckIndex, i);
                if (readMessages == null) {
                    this.partitionManager.releasePartition(consumer, s);
                    return pullResult;
                }
                ArrayList newArrayList = Lists.newArrayList(readMessages);
                org.joyqueue.domain.Consumer consumer2 = this.clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());
                List<ByteBuffer> handle = this.delayHandler.handle(consumer2.getConsumerPolicy(), this.filterMessageSupport.filter(consumer2, newArrayList, new FilterCallbackImpl(consumer)));
                if (handle != null && handle.size() == 0) {
                    this.partitionManager.releasePartition(consumer, s);
                }
                pullResult = new PullResult(consumer, s, handle);
                if (this.config.getLogDetail(consumer.getApp())) {
                    this.logger.info("getMessage4Sequence, topic: {}, app: {}, count: {}, partition: {}, index: {}, result: {}", new Object[]{consumer.getTopic(), consumer.getApp(), Integer.valueOf(i), Short.valueOf(s), Long.valueOf(lastMsgAckIndex), Integer.valueOf(pullResult.getBuffers().size())});
                }
            } catch (Exception e) {
                this.partitionManager.releasePartition(consumer, s);
                if (e instanceof PositionOverflowException) {
                    if (e.getRight() < lastMsgAckIndex) {
                        pullResult.setCode(JoyQueueCode.SE_INDEX_OVERFLOW);
                        this.logger.error(e.getMessage(), e);
                    }
                } else if (e instanceof PositionUnderflowException) {
                    pullResult.setCode(JoyQueueCode.SE_INDEX_UNDERFLOW);
                    this.logger.error(e.getMessage(), e);
                } else {
                    this.logger.error("get message error, consumer: {}, partition: {}", new Object[]{consumer, Short.valueOf(s), e});
                }
            }
        }
        return pullResult;
    }

    private PullResult getRetryMessage(Consumer consumer, PullResult pullResult) throws JoyQueueException {
        List<RetryMessageModel> retry = this.messageRetry.getRetry(consumer.getTopic(), consumer.getApp(), (short) 1, 0L);
        if (!CollectionUtils.isNotEmpty(retry) || retry.size() <= 0) {
            this.partitionManager.decreaseRetryProbability(consumer);
        } else {
            pullResult = brokerMessage2PullResult(consumer, retry);
            this.partitionManager.increaseRetryProbability(consumer);
        }
        return pullResult;
    }

    private ByteBuffer[] readMessages(Consumer consumer, int i, short s, long j, int i2) throws IOException {
        PartitionGroupStore store = this.storeService.getStore(consumer.getTopic(), i);
        if (j < store.getLeftIndex(s) || j >= store.getRightIndex(s)) {
            return null;
        }
        try {
            ReadResult read = store.read(s, j, i2, Long.MAX_VALUE);
            if (read.getCode() != JoyQueueCode.SUCCESS) {
                this.logger.error("read message error, error code[{}]", read.getCode());
                return null;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("readMessage by topic:[{}], app:[{}], partition:[{}], consumer: [{}], count:[{}], result: {}", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(s), consumer, Integer.valueOf(i2), Integer.valueOf(ArrayUtils.getLength(read.getMessages()))});
            }
            return read.getMessages();
        } catch (PositionUnderflowException e) {
            this.logger.debug("PositionUnderflow,topic:{},app:{},partition:{},index:{}", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(s), Long.valueOf(j)});
            throw e;
        } catch (PositionOverflowException e2) {
            this.logger.debug("PositionOverflow,topic:{},app:{},partition:{},index:{}", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(s), Long.valueOf(j)});
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerAcknowledge(Consumer consumer, List<ByteBuffer> list) throws JoyQueueException {
        if (list == null) {
            return;
        }
        MessageLocation[] convertMessageLocation = convertMessageLocation(consumer.getTopic(), list);
        acknowledge(convertMessageLocation, consumer, true);
        archiveIfNecessary(convertMessageLocation);
    }

    private void archiveIfNecessary(MessageLocation[] messageLocationArr) throws JoyQueueException {
        ConsumeArchiveService consumeArchiveService;
        if (this.archiveManager == null || (consumeArchiveService = this.archiveManager.getConsumeArchiveService()) == null) {
            return;
        }
        Connection connection = new Connection();
        try {
            connection.setAddress(IpUtil.toByte(new InetSocketAddress(IpUtil.getLocalIp(), 50088)));
        } catch (Exception e) {
            connection.setAddress(new byte[0]);
        }
        connection.setApp("innerFilter@" + connection.getApp());
        consumeArchiveService.appendConsumeLog(connection, messageLocationArr);
    }

    private MessageLocation[] convertMessageLocation(String str, List<ByteBuffer> list) {
        MessageLocation[] messageLocationArr = new MessageLocation[list.size()];
        for (int i = 0; i < list.size(); i++) {
            ByteBuffer byteBuffer = list.get(i);
            messageLocationArr[i] = new MessageLocation(str, Serializer.readPartition(byteBuffer), Serializer.readIndex(byteBuffer));
        }
        return messageLocationArr;
    }

    public boolean acknowledge(MessageLocation[] messageLocationArr, Consumer consumer, boolean z) throws JoyQueueException {
        boolean z2 = false;
        if (messageLocationArr.length < 1) {
            return false;
        }
        String topic = consumer.getTopic();
        String app = consumer.getApp();
        short partition = messageLocationArr[0].getPartition();
        if (partition == Short.MAX_VALUE) {
            this.logger.debug("retry ack by topic:[{}], app:[{}], locations:[{}]", new Object[]{topic, app, Arrays.toString(messageLocationArr)});
            return retryAck(topic, app, messageLocationArr, z);
        }
        long[] sortMsgLocation = AcknowledgeSupport.sortMsgLocation(messageLocationArr);
        if (sortMsgLocation == null) {
            throw new JoyQueueException(JoyQueueCode.FW_CONSUMER_ACK_FAIL, new Object[]{"ack index is not continue or repeatable!"});
        }
        long lastMsgAckIndex = this.positionManager.getLastMsgAckIndex(TopicName.parse(topic), app, partition);
        if (lastMsgAckIndex == sortMsgLocation[0]) {
            long j = sortMsgLocation[1] + 1;
            z2 = this.positionManager.updateLastMsgAckIndex(TopicName.parse(topic), app, partition, j);
            this.positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, j);
        } else {
            this.logger.error("ack index : [{} - {}] is not continue, partition: {}, currentIndex is : [{}], consumer info is : {}", new Object[]{Long.valueOf(sortMsgLocation[0]), Long.valueOf(sortMsgLocation[1]), Short.valueOf(partition), Long.valueOf(lastMsgAckIndex), consumer});
        }
        if (this.config.getLogDetail(consumer.getApp())) {
            this.logger.info("acknowledge, topic: {}, app: {}, partition: {}, startIndex: {}, endIndex: {}, isSuccess: {}", new Object[]{consumer.getTopic(), consumer.getApp(), Short.valueOf(partition), Long.valueOf(sortMsgLocation[0]), Long.valueOf(sortMsgLocation[1]), Boolean.valueOf(z2)});
        }
        return z2;
    }

    private boolean retryAck(String str, String str2, MessageLocation[] messageLocationArr, boolean z) {
        Long[] lArr = new Long[messageLocationArr.length];
        for (int i = 0; i < messageLocationArr.length; i++) {
            lArr[i] = Long.valueOf(messageLocationArr[i].getIndex());
        }
        try {
            if (z) {
                this.messageRetry.retrySuccess(str, str2, lArr);
            } else {
                this.messageRetry.retryError(str, str2, lArr);
            }
            return true;
        } catch (JoyQueueException e) {
            this.logger.error("RetryAck error.", e);
            return false;
        }
    }
}
