package org.joyqueue.client.internal.consumer.support;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BrokerLoadBalance;
import org.joyqueue.client.internal.consumer.MessageFetcher;
import org.joyqueue.client.internal.consumer.callback.FetchListener;
import org.joyqueue.client.internal.consumer.callback.PartitionFetchListener;
import org.joyqueue.client.internal.consumer.callback.PollerListener;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.converter.BrokerAssignmentConverter;
import org.joyqueue.client.internal.consumer.coordinator.domain.BrokerAssignment;
import org.joyqueue.client.internal.consumer.coordinator.domain.BrokerAssignments;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.consumer.domain.FetchMessageData;
import org.joyqueue.client.internal.consumer.exception.ConsumerException;
import org.joyqueue.client.internal.consumer.transport.ConsumerClient;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
import org.joyqueue.client.internal.trace.TraceBuilder;
import org.joyqueue.client.internal.trace.TraceCaller;
import org.joyqueue.client.internal.trace.TraceType;
import org.joyqueue.client.internal.transport.ClientState;
import org.joyqueue.domain.ConsumerPolicy;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.domain.BrokerNode;
import org.joyqueue.shaded.com.google.common.base.Preconditions;
import org.joyqueue.shaded.com.google.common.collect.Lists;
import org.joyqueue.shaded.org.apache.commons.collections.CollectionUtils;
import org.joyqueue.shaded.org.apache.commons.lang3.StringUtils;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/MessagePollerInner.class */
public class MessagePollerInner extends Service {
    public static final long FETCH_PARTITION_NONE_INDEX = -1;
    protected static final Logger logger = LoggerFactory.getLogger(MessagePollerInner.class);
    private ConsumerConfig config;
    private NameServerConfig nameServerConfig;
    private ClusterManager clusterManager;
    private ConsumerClientManager consumerClientManager;
    private MessageFetcher messageFetcher;
    private BrokerLoadBalanceManager brokerLoadBalanceManager;
    private String appFullName;

    public MessagePollerInner(ConsumerConfig consumerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ConsumerClientManager consumerClientManager, MessageFetcher messageFetcher) {
        this.config = consumerConfig;
        this.nameServerConfig = nameServerConfig;
        this.clusterManager = clusterManager;
        this.consumerClientManager = consumerClientManager;
        this.messageFetcher = messageFetcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Service, org.joyqueue.toolkit.service.Activity
    public void validate() throws Exception {
        this.brokerLoadBalanceManager = new BrokerLoadBalanceManager();
    }

    public List<ConsumeMessage> fetchTopic(BrokerNode brokerNode, String str, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return fetchTopic(brokerNode, getAndCheckTopicMetadata(str), i, j, timeUnit, pollerListener);
    }

    public List<ConsumeMessage> fetchTopic(BrokerNode brokerNode, TopicMetadata topicMetadata, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        Preconditions.checkArgument(timeUnit != null, "timeoutUnit not null");
        TraceCaller buildTraceCaller = buildTraceCaller(topicMetadata);
        try {
            List<ConsumeMessage> doFetchTopic = doFetchTopic(brokerNode, topicMetadata, i, j, timeUnit, pollerListener);
            buildTraceCaller.end();
            return doFetchTopic;
        } catch (Exception e) {
            buildTraceCaller.error();
            if (e instanceof ConsumerException) {
                throw ((ConsumerException) e);
            }
            throw new ConsumerException(e);
        }
    }

    protected List<ConsumeMessage> doFetchTopic(BrokerNode brokerNode, TopicMetadata topicMetadata, int i, long j, TimeUnit timeUnit, final PollerListener pollerListener) {
        ConsumerPolicy consumerPolicy = topicMetadata.getConsumerPolicy();
        long millis = timeUnit.toMillis(j);
        final String topic = topicMetadata.getTopic();
        final String appFullName = this.config.getAppFullName();
        long intValue = this.config.getAckTimeout() == -1 ? consumerPolicy.getAckTimeout().intValue() : this.config.getAckTimeout();
        if (pollerListener == null) {
            return handleFetchMessageData(topic, appFullName, this.messageFetcher.fetch(brokerNode, topic, appFullName, i, millis, intValue, this.config.getLongPollTimeout()));
        }
        this.messageFetcher.asyncFetch(brokerNode, topic, appFullName, i, millis, intValue, this.config.getLongPollTimeout(), new FetchListener() { // from class: org.joyqueue.client.internal.consumer.support.MessagePollerInner.1
            @Override // org.joyqueue.client.internal.consumer.callback.FetchListener
            public void onMessage(FetchMessageData fetchMessageData) {
                try {
                    pollerListener.onMessage(MessagePollerInner.this.handleFetchMessageData(topic, appFullName, fetchMessageData));
                } catch (Exception e) {
                    pollerListener.onException(e);
                }
            }

            @Override // org.joyqueue.client.internal.consumer.callback.FetchListener
            public void onException(Throwable th) {
                pollerListener.onException(th);
            }
        });
        return null;
    }

    public List<ConsumeMessage> fetchPartition(BrokerNode brokerNode, String str, short s, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        return fetchPartition(brokerNode, str, s, -1L, i, j, timeUnit, pollerListener);
    }

    public List<ConsumeMessage> fetchPartition(BrokerNode brokerNode, TopicMetadata topicMetadata, short s, int i, long j, TimeUnit timeUnit, PollerListener pollerListener) {
        return fetchPartition(brokerNode, topicMetadata, s, -1L, i, j, timeUnit, pollerListener);
    }

    public List<ConsumeMessage> fetchPartition(BrokerNode brokerNode, String str, short s, long j, int i, long j2, TimeUnit timeUnit, PollerListener pollerListener) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return fetchPartition(brokerNode, getAndCheckTopicMetadata(str), s, j, i, j2, timeUnit, pollerListener);
    }

    public List<ConsumeMessage> fetchPartition(BrokerNode brokerNode, TopicMetadata topicMetadata, short s, long j, int i, long j2, TimeUnit timeUnit, PollerListener pollerListener) {
        Preconditions.checkArgument(topicMetadata != null, "topicMetadata not null");
        Preconditions.checkArgument(timeUnit != null, "timeoutUnit not null");
        TraceCaller buildTraceCaller = buildTraceCaller(topicMetadata);
        try {
            List<ConsumeMessage> doFetchPartition = doFetchPartition(brokerNode, topicMetadata, s, j, i, j2, timeUnit, pollerListener);
            buildTraceCaller.end();
            return doFetchPartition;
        } catch (Exception e) {
            buildTraceCaller.error();
            if (e instanceof ConsumerException) {
                throw ((ConsumerException) e);
            }
            throw new ConsumerException(e);
        }
    }

    protected List<ConsumeMessage> doFetchPartition(BrokerNode brokerNode, TopicMetadata topicMetadata, short s, long j, int i, long j2, TimeUnit timeUnit, final PollerListener pollerListener) {
        long millis = timeUnit.toMillis(j2);
        final String topic = topicMetadata.getTopic();
        final String appFullName = this.config.getAppFullName();
        if (pollerListener == null) {
            return handleFetchMessageData(topic, appFullName, j == -1 ? this.messageFetcher.fetchPartition(brokerNode, topic, appFullName, s, i, millis) : this.messageFetcher.fetchPartition(brokerNode, topic, appFullName, s, j, i, millis));
        }
        PartitionFetchListener partitionFetchListener = new PartitionFetchListener() { // from class: org.joyqueue.client.internal.consumer.support.MessagePollerInner.2
            @Override // org.joyqueue.client.internal.consumer.callback.PartitionFetchListener
            public void onMessage(FetchMessageData fetchMessageData) {
                try {
                    pollerListener.onMessage(MessagePollerInner.this.handleFetchMessageData(topic, appFullName, fetchMessageData));
                } catch (Exception e) {
                    pollerListener.onException(e);
                }
            }

            @Override // org.joyqueue.client.internal.consumer.callback.PartitionFetchListener
            public void onException(Throwable th) {
                pollerListener.onException(th);
            }
        };
        if (j == -1) {
            this.messageFetcher.fetchPartitionAsync(brokerNode, topic, appFullName, s, j, i, millis, partitionFetchListener);
            return null;
        }
        this.messageFetcher.fetchPartitionAsync(brokerNode, topic, appFullName, s, i, millis, partitionFetchListener);
        return null;
    }

    protected List<ConsumeMessage> handleFetchMessageData(String str, String str2, FetchMessageData fetchMessageData) {
        if (fetchMessageData == null) {
            throw new ConsumerException(JoyQueueCode.CN_UNKNOWN_ERROR.getMessage(new Object[0]), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
        }
        JoyQueueCode code = fetchMessageData.getCode();
        if (code.equals(JoyQueueCode.SUCCESS)) {
            return fetchMessageData.getMessages();
        }
        switch (code) {
            case CN_NO_PERMISSION:
            case CN_SERVICE_NOT_AVAILABLE:
            case FW_FETCH_TOPIC_MESSAGE_BROKER_NOT_LEADER:
                logger.warn("fetch message error, no permission, topic: {}", str);
                this.clusterManager.updateTopicMetadata(str, str2);
                break;
            case FW_GET_MESSAGE_TOPIC_NOT_READ:
            case FW_FETCH_TOPIC_MESSAGE_PAUSED:
                logger.debug("fetch message error, not read or paused, topic: {}", str);
                break;
            case FW_FETCH_MESSAGE_INDEX_OUT_OF_RANGE:
            case SE_INDEX_OVERFLOW:
            case SE_INDEX_UNDERFLOW:
                logger.warn("fetch message index out of range, reset index, topic: {}, app: {}", str, str2);
                throw new ConsumerException(code.getMessage(new Object[0]), code.getCode());
            case FW_TOPIC_NOT_EXIST:
                logger.debug("fetch message error, topic not exist, topic: {}", str);
                throw new ConsumerException(code.getMessage(new Object[0]), code.getCode());
            case FW_BROKER_NOT_READABLE:
                logger.debug("fetch message error, broker not readable, topic: {}", str);
                this.clusterManager.updateTopicMetadata(str, str2);
                break;
            default:
                logger.error("fetch message error, topic: {}, code: {}, error: {}", new Object[]{str, code, code.getMessage(new Object[0])});
                break;
        }
        return Collections.emptyList();
    }

    protected TraceCaller buildTraceCaller(TopicMetadata topicMetadata) {
        return TraceBuilder.newInstance().topic(topicMetadata.getTopic()).app(this.config.getAppFullName()).namespace(this.nameServerConfig.getNamespace()).type(TraceType.CONSUMER_FETCH).begin();
    }

    public BrokerAssignments filterRegionBrokers(TopicMetadata topicMetadata, BrokerAssignments brokerAssignments) {
        if (!topicMetadata.getConsumerPolicy().getNearby().booleanValue() || CollectionUtils.isEmpty(brokerAssignments.getAssignments())) {
            return brokerAssignments;
        }
        ArrayList arrayList = null;
        for (BrokerAssignment brokerAssignment : brokerAssignments.getAssignments()) {
            if (!brokerAssignment.getBroker().isNearby()) {
                if (arrayList == null) {
                    arrayList = Lists.newArrayList(brokerAssignments.getAssignments());
                }
                arrayList.remove(brokerAssignment);
            }
        }
        return arrayList == null ? brokerAssignments : new BrokerAssignments(arrayList);
    }

    public BrokerAssignments filterNotAvailableBrokers(BrokerAssignments brokerAssignments) {
        if (CollectionUtils.isEmpty(brokerAssignments.getAssignments())) {
            return brokerAssignments;
        }
        ArrayList arrayList = null;
        for (BrokerAssignment brokerAssignment : brokerAssignments.getAssignments()) {
            ConsumerClient tryGetClient = this.consumerClientManager.tryGetClient(brokerAssignment.getBroker());
            if (tryGetClient != null && !tryGetClient.getState().equals(ClientState.CONNECTED)) {
                if (arrayList == null) {
                    arrayList = Lists.newArrayList(brokerAssignments.getAssignments());
                }
                arrayList.remove(brokerAssignment);
            }
        }
        return arrayList == null ? brokerAssignments : new BrokerAssignments(arrayList);
    }

    public BrokerLoadBalance getBrokerLoadBalance(String str) {
        return this.brokerLoadBalanceManager.getBrokerLoadBalance(str, this.config.getLoadBalanceType());
    }

    public List<ConsumeMessage> buildPollEmptyResult(PollerListener pollerListener) {
        if (pollerListener == null) {
            return Collections.emptyList();
        }
        pollerListener.onMessage(Collections.emptyList());
        return null;
    }

    public BrokerAssignments buildAllBrokerAssignments(TopicMetadata topicMetadata) {
        return BrokerAssignmentConverter.convertBrokerAssignments(topicMetadata);
    }

    public TopicMetadata getAndCheckTopicMetadata(String str) {
        TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(getTopicFullName(str), this.config.getAppFullName());
        if (fetchTopicMetadata == null) {
            throw new ConsumerException(String.format("topic %s is not exist", str), JoyQueueCode.FW_TOPIC_NOT_EXIST.getCode());
        }
        if (fetchTopicMetadata.getConsumerPolicy() == null) {
            throw new ConsumerException(String.format("topic %s consumer %s is not exist", str, this.config.getAppFullName()), JoyQueueCode.FW_CONSUMER_NOT_EXISTS.getCode());
        }
        return fetchTopicMetadata;
    }

    public String getTopicFullName(String str) {
        return NameServerHelper.getTopicFullName(str, this.nameServerConfig);
    }
}
