package org.joyqueue.broker.manage.service.support;

import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.MessageConvertSupport;
import org.joyqueue.broker.manage.exception.ManageException;
import org.joyqueue.broker.manage.service.MessageManageService;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;
import org.joyqueue.monitor.BrokerMessageInfo;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.store.StoreManagementService;

/* loaded from: input_file:org/joyqueue/broker/manage/service/support/DefaultMessageManageService.class */
public class DefaultMessageManageService implements MessageManageService {
    private Consume consume;
    private StoreManagementService storeManagementService;
    private MessageConvertSupport messageConvertSupport;

    public DefaultMessageManageService(Consume consume, StoreManagementService storeManagementService, MessageConvertSupport messageConvertSupport) {
        this.consume = consume;
        this.storeManagementService = storeManagementService;
        this.messageConvertSupport = messageConvertSupport;
    }

    @Override // org.joyqueue.broker.manage.service.MessageManageService
    public List<BrokerMessageInfo> getPartitionMessage(String str, String str2, short s, long j, int i) {
        try {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
            ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(i);
            byte[][] readMessages = this.storeManagementService.readMessages(str, s, j, i);
            if (ArrayUtils.isNotEmpty(readMessages)) {
                for (byte[] bArr : readMessages) {
                    newArrayListWithCapacity.add(Serializer.readBrokerMessage(ByteBuffer.wrap(bArr)));
                }
            }
            Iterator<BrokerMessage> it = this.messageConvertSupport.convert(newArrayListWithCapacity, SourceType.INTERNAL.getValue()).iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity2.add(new BrokerMessageInfo(it.next()));
            }
            return newArrayListWithCapacity2;
        } catch (Exception e) {
            throw new ManageException(e);
        }
    }

    @Override // org.joyqueue.broker.manage.service.MessageManageService
    public List<BrokerMessageInfo> getPendingMessage(String str, String str2, int i) {
        try {
            Consumer consumer = new Consumer();
            consumer.setTopic(str);
            consumer.setApp(str2);
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
            for (StoreManagementService.PartitionGroupMetric partitionGroupMetric : this.storeManagementService.topicMetric(str).getPartitionGroupMetrics()) {
                for (StoreManagementService.PartitionMetric partitionMetric : partitionGroupMetric.getPartitionMetrics()) {
                    int size = i - newArrayListWithCapacity.size();
                    if (size <= 0) {
                        break;
                    }
                    long ackIndex = this.consume.getAckIndex(consumer, partitionMetric.getPartition());
                    if (ackIndex < 0) {
                        ackIndex = 0;
                    }
                    if (ackIndex < partitionMetric.getRightIndex()) {
                        LinkedList newLinkedList = Lists.newLinkedList();
                        byte[][] readMessages = this.storeManagementService.readMessages(str, partitionMetric.getPartition(), ackIndex, size);
                        if (ArrayUtils.isNotEmpty(readMessages)) {
                            for (byte[] bArr : readMessages) {
                                newLinkedList.add(Serializer.readBrokerMessage(ByteBuffer.wrap(bArr)));
                            }
                        }
                        for (BrokerMessage brokerMessage : this.messageConvertSupport.convert(newLinkedList, SourceType.INTERNAL.getValue())) {
                            newArrayListWithCapacity.add(new BrokerMessageInfo(brokerMessage, ackIndex > brokerMessage.getMsgIndexNo()));
                        }
                    }
                }
            }
            return newArrayListWithCapacity;
        } catch (Exception e) {
            throw new ManageException(e);
        }
    }

    @Override // org.joyqueue.broker.manage.service.MessageManageService
    public List<BrokerMessageInfo> getLastMessage(String str, String str2, int i) {
        try {
            Consumer consumer = new Consumer();
            consumer.setTopic(str);
            consumer.setApp(str2);
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
            for (StoreManagementService.PartitionGroupMetric partitionGroupMetric : this.storeManagementService.topicMetric(str).getPartitionGroupMetrics()) {
                for (StoreManagementService.PartitionMetric partitionMetric : partitionGroupMetric.getPartitionMetrics()) {
                    if (partitionMetric.getRightIndex() != 0) {
                        int size = i - newArrayListWithCapacity.size();
                        if (size <= 0) {
                            break;
                        }
                        long ackIndex = this.consume.getAckIndex(consumer, partitionMetric.getPartition());
                        long j = ackIndex;
                        if (j < 0) {
                            j = 0;
                        }
                        if (j >= partitionMetric.getRightIndex()) {
                            j = Math.max(partitionMetric.getRightIndex() - size, 0L);
                        }
                        LinkedList newLinkedList = Lists.newLinkedList();
                        byte[][] readMessages = this.storeManagementService.readMessages(str, partitionMetric.getPartition(), j, size);
                        if (ArrayUtils.isNotEmpty(readMessages)) {
                            for (byte[] bArr : readMessages) {
                                newLinkedList.add(Serializer.readBrokerMessage(ByteBuffer.wrap(bArr)));
                            }
                        }
                        for (BrokerMessage brokerMessage : this.messageConvertSupport.convert(newLinkedList, SourceType.INTERNAL.getValue())) {
                            newArrayListWithCapacity.add(new BrokerMessageInfo(brokerMessage, ackIndex > brokerMessage.getMsgIndexNo()));
                        }
                    }
                }
            }
            return newArrayListWithCapacity;
        } catch (Exception e) {
            throw new ManageException(e);
        }
    }

    @Override // org.joyqueue.broker.manage.service.MessageManageService
    public List<BrokerMessageInfo> viewMessage(String str, String str2, int i) {
        List<BrokerMessageInfo> pendingMessage = getPendingMessage(str, str2, i);
        return CollectionUtils.isNotEmpty(pendingMessage) ? pendingMessage : getLastMessage(str, str2, i);
    }
}
