package org.joyqueue.client.internal.consumer.support;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.joyqueue.client.internal.cluster.ClusterClientManager;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BaseMessageListener;
import org.joyqueue.client.internal.consumer.BatchMessageListener;
import org.joyqueue.client.internal.consumer.MessageConsumer;
import org.joyqueue.client.internal.consumer.MessageListener;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.consumer.domain.FetchIndexData;
import org.joyqueue.client.internal.consumer.exception.ConsumerException;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptor;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptorManager;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.exception.ClientException;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.shaded.com.google.common.base.Preconditions;
import org.joyqueue.shaded.org.apache.commons.collections.CollectionUtils;
import org.joyqueue.shaded.org.apache.commons.lang3.StringUtils;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/DefaultMessageConsumer.class */
public class DefaultMessageConsumer extends Service implements MessageConsumer {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultMessageConsumer.class);
    private ConsumerConfig config;
    private NameServerConfig nameServerConfig;
    private ClusterManager clusterManager;
    private ClusterClientManager clusterClientManager;
    private ConsumerClientManager consumerClientManager;
    private String subscribeTopic;
    private TopicMessageConsumer topicMessageConsumer;
    private ConsumerInterceptorManager consumerInterceptorManager = new ConsumerInterceptorManager();

    public DefaultMessageConsumer(ConsumerConfig consumerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ClusterClientManager clusterClientManager, ConsumerClientManager consumerClientManager) {
        Preconditions.checkArgument(consumerConfig != null, "config not null");
        Preconditions.checkArgument(nameServerConfig != null, "nameserver not null");
        Preconditions.checkArgument(clusterManager != null, "clusterManager not null");
        Preconditions.checkArgument(clusterClientManager != null, "clusterClientManager not null");
        Preconditions.checkArgument(consumerClientManager != null, "consumerClientManager not null");
        this.config = consumerConfig;
        this.nameServerConfig = nameServerConfig;
        this.clusterManager = clusterManager;
        this.clusterClientManager = clusterClientManager;
        this.consumerClientManager = consumerClientManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStart() throws Exception {
        if (this.topicMessageConsumer != null) {
            this.topicMessageConsumer.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStop() {
        if (this.topicMessageConsumer != null) {
            this.topicMessageConsumer.stop();
        }
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public synchronized void subscribe(String str) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic can not be null");
        checkUnsubscribe();
        this.topicMessageConsumer = newTopicMessageConsumer(str);
        this.subscribeTopic = str;
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public void unsubscribe() {
        checkSubscribe();
        this.subscribeTopic = null;
        if (isStarted()) {
            this.topicMessageConsumer.stop();
        }
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public String subscription() {
        return this.subscribeTopic;
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public boolean isSubscribed() {
        return StringUtils.isNotBlank(this.subscribeTopic);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public synchronized void subscribe(String str, MessageListener messageListener) {
        doSubscribeListener(str, messageListener);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public synchronized void subscribeBatch(String str, BatchMessageListener batchMessageListener) {
        doSubscribeListener(str, batchMessageListener);
    }

    protected void doSubscribeListener(String str, BaseMessageListener baseMessageListener) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic can not be null");
        Preconditions.checkArgument(baseMessageListener != null, "listener can not be null");
        checkUnsubscribe();
        TopicMessageConsumer newTopicMessageConsumer = newTopicMessageConsumer(str);
        newTopicMessageConsumer.addListener(baseMessageListener);
        this.topicMessageConsumer = newTopicMessageConsumer;
        this.subscribeTopic = str;
    }

    protected TopicMessageConsumer newTopicMessageConsumer(String str) {
        TopicMessageConsumer topicMessageConsumer = new TopicMessageConsumer(str, this.config, this.nameServerConfig, this.clusterManager, this.clusterClientManager, this.consumerClientManager, this.consumerInterceptorManager);
        try {
            if (isStarted()) {
                topicMessageConsumer.start();
            }
            return topicMessageConsumer;
        } catch (IllegalArgumentException e) {
            logger.debug("newTopicMessageConsumer exception, topic: {}", str, e);
            throw e;
        } catch (ClientException e2) {
            logger.debug("newTopicMessageConsumer exception, topic: {}", str, e2);
            throw new ConsumerException(e2.getMessage(), e2.getCode(), e2);
        } catch (Exception e3) {
            logger.debug("newTopicMessageConsumer exception, topic: {}", str, e3);
            throw new ConsumerException(JoyQueueCode.CN_UNKNOWN_ERROR.getMessage(new Object[0]), JoyQueueCode.CN_UNKNOWN_ERROR.getCode(), e3);
        }
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public void resumeListen() {
        checkState();
        checkSubscribe();
        this.topicMessageConsumer.resume();
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public void suspendListen() {
        checkState();
        checkSubscribe();
        this.topicMessageConsumer.suspend();
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public boolean isListenSuspended() {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.isSuspend();
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public synchronized void addInterceptor(ConsumerInterceptor consumerInterceptor) {
        Preconditions.checkArgument(consumerInterceptor != null, "interceptor can not be null");
        this.consumerInterceptorManager.addInterceptor(consumerInterceptor);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public synchronized void removeInterceptor(ConsumerInterceptor consumerInterceptor) {
        Preconditions.checkArgument(consumerInterceptor != null, "interceptor can not be null");
        this.consumerInterceptorManager.removeInterceptor(consumerInterceptor);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollOnce() {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollOnce(this.subscribeTopic);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollOnce(long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollOnce(this.subscribeTopic, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> poll() {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().poll(this.subscribeTopic);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> poll(long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().poll(this.subscribeTopic, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollAsync() {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollAsync(this.subscribeTopic);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollAsync(long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollAsync(this.subscribeTopic, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollPartitionOnce(short s) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionOnce(this.subscribeTopic, s);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollPartitionOnce(short s, long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionOnce(this.subscribeTopic, s, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollPartitionOnce(short s, long j) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionOnce(this.subscribeTopic, s, j);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public ConsumeMessage pollPartitionOnce(short s, long j, long j2, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionOnce(this.subscribeTopic, s, j, j2, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> pollPartition(short s) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartition(this.subscribeTopic, s);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> pollPartition(short s, long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartition(this.subscribeTopic, s, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(short s) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionAsync(this.subscribeTopic, s);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(short s, long j, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionAsync(this.subscribeTopic, s, j, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> pollPartition(short s, long j) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartition(this.subscribeTopic, s, j);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public List<ConsumeMessage> pollPartition(short s, long j, long j2, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartition(this.subscribeTopic, s, j, j2, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(short s, long j) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionAsync(this.subscribeTopic, s, j);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public CompletableFuture<List<ConsumeMessage>> pollPartitionAsync(short s, long j, long j2, TimeUnit timeUnit) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().pollPartitionAsync(this.subscribeTopic, s, j, j2, timeUnit);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public JoyQueueCode reply(List<ConsumeReply> list) {
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(list), "replyList can not be null");
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().reply(this.subscribeTopic, list);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public JoyQueueCode replyOnce(ConsumeReply consumeReply) {
        Preconditions.checkArgument(consumeReply != null, "replyList can not be null");
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().replyOnce(this.subscribeTopic, consumeReply);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public FetchIndexData fetchIndex(short s) {
        checkState();
        checkSubscribe();
        return this.topicMessageConsumer.getMessagePoller().fetchIndex(this.subscribeTopic, s);
    }

    @Override // org.joyqueue.client.internal.consumer.MessageConsumer
    public TopicMetadata getTopicMetadata(String str) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return this.clusterManager.fetchTopicMetadata(NameServerHelper.getTopicFullName(str, this.nameServerConfig), this.config.getAppFullName());
    }

    protected void checkState() {
        if (!isStarted()) {
            throw new ConsumerException("consumer is not started", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }

    protected void checkSubscribe() {
        if (StringUtils.isBlank(this.subscribeTopic)) {
            throw new ConsumerException("consumer not subscribe topic", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }

    protected void checkUnsubscribe() {
        if (StringUtils.isNotBlank(this.subscribeTopic)) {
            throw new ConsumerException("consumer is subscribed topic", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }
}
