package io.openmessaging.joyqueue.consumer.support;

import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.joyqueue.config.ExceptionConverter;
import io.openmessaging.joyqueue.consumer.ConsumerIndex;
import io.openmessaging.joyqueue.consumer.ExtensionConsumer;
import io.openmessaging.joyqueue.consumer.extension.ExtensionAdapter;
import io.openmessaging.joyqueue.consumer.message.MessageConverter;
import io.openmessaging.joyqueue.consumer.message.MessageReceiptAdapter;
import io.openmessaging.joyqueue.support.AbstractServiceLifecycle;
import io.openmessaging.message.Message;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.joyqueue.client.internal.consumer.MessageConsumer;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.consumer.domain.FetchIndexData;
import org.joyqueue.network.command.RetryType;
import org.joyqueue.shaded.com.google.common.base.Preconditions;
import org.joyqueue.shaded.com.google.common.collect.Lists;
import org.joyqueue.shaded.org.apache.commons.collections.CollectionUtils;
import org.joyqueue.shaded.org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:io/openmessaging/joyqueue/consumer/support/ConsumerImpl.class */
public class ConsumerImpl extends AbstractServiceLifecycle implements ExtensionConsumer {
    private MessageConsumer messageConsumer;
    private Optional<Extension> extension;

    public ConsumerImpl(MessageConsumer messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStart() throws Exception {
        try {
            this.messageConsumer.start();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStop() {
        try {
            this.messageConsumer.stop();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void resume() {
        try {
            this.messageConsumer.resumeListen();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void suspend() {
        try {
            this.messageConsumer.suspendListen();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void suspend(long j) {
        suspend();
    }

    @Override // io.openmessaging.consumer.Consumer
    public boolean isSuspended() {
        try {
            return this.messageConsumer.isListenSuspended();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void bindQueue(String str) {
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "queueName can not be null");
            this.messageConsumer.subscribe(str);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void bindQueue(String str, MessageListener messageListener) {
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "queueName can not be null");
            Preconditions.checkArgument(messageListener != null, "listener can not be null");
            this.messageConsumer.subscribe(str, new MessageListenerAdapter(messageListener));
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void bindQueue(String str, BatchMessageListener batchMessageListener) {
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "queueName can not be null");
            Preconditions.checkArgument(batchMessageListener != null, "listener can not be null");
            this.messageConsumer.subscribeBatch(str, new BatchMessageListenerAdapter(batchMessageListener));
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void unbindQueue(String str) {
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "queueName can not be null");
            if (str.equals(this.messageConsumer.subscription())) {
                this.messageConsumer.unsubscribe();
            }
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public boolean isBindQueue() {
        try {
            return this.messageConsumer.isSubscribed();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public String getBindQueue() {
        try {
            return this.messageConsumer.subscription();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void addInterceptor(ConsumerInterceptor consumerInterceptor) {
        try {
            Preconditions.checkArgument(consumerInterceptor != null, "interceptor can not be null");
            this.messageConsumer.addInterceptor(new ConsumerInterceptorAdapter(consumerInterceptor));
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void removeInterceptor(ConsumerInterceptor consumerInterceptor) {
        try {
            Preconditions.checkArgument(consumerInterceptor != null, "interceptor can not be null");
            this.messageConsumer.removeInterceptor(new ConsumerInterceptorAdapter(consumerInterceptor));
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public Message receive(long j) {
        try {
            ConsumeMessage pollOnce = this.messageConsumer.pollOnce(j, TimeUnit.MILLISECONDS);
            if (pollOnce == null) {
                return null;
            }
            return MessageConverter.convertMessage(pollOnce);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public List<Message> batchReceive(long j) {
        try {
            List<ConsumeMessage> poll = this.messageConsumer.poll(j, TimeUnit.MILLISECONDS);
            return CollectionUtils.isEmpty(poll) ? Collections.emptyList() : MessageConverter.convertMessages(poll);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public Message receive(short s, long j) {
        try {
            ConsumeMessage pollPartitionOnce = this.messageConsumer.pollPartitionOnce(s, j, TimeUnit.MILLISECONDS);
            if (pollPartitionOnce == null) {
                return null;
            }
            return MessageConverter.convertMessage(pollPartitionOnce);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public List<Message> batchReceive(short s, long j) {
        try {
            List<ConsumeMessage> pollPartition = this.messageConsumer.pollPartition(s, j, TimeUnit.MILLISECONDS);
            return CollectionUtils.isEmpty(pollPartition) ? Collections.emptyList() : MessageConverter.convertMessages(pollPartition);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public Message receive(short s, long j, long j2) {
        try {
            ConsumeMessage pollPartitionOnce = this.messageConsumer.pollPartitionOnce(s, j, j2, TimeUnit.MILLISECONDS);
            if (pollPartitionOnce == null) {
                return null;
            }
            return MessageConverter.convertMessage(pollPartitionOnce);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public List<Message> batchReceive(short s, long j, long j2) {
        try {
            List<ConsumeMessage> pollPartition = this.messageConsumer.pollPartition(s, j, j2, TimeUnit.MILLISECONDS);
            return CollectionUtils.isEmpty(pollPartition) ? Collections.emptyList() : MessageConverter.convertMessages(pollPartition);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.consumer.Consumer
    public void ack(MessageReceipt messageReceipt) {
        try {
            Preconditions.checkArgument(messageReceipt instanceof MessageReceiptAdapter, "receipt is not supported");
            ConsumeMessage message = ((MessageReceiptAdapter) messageReceipt).getMessage();
            this.messageConsumer.replyOnce(new ConsumeReply(message.getPartition(), message.getIndex(), RetryType.NONE));
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.Client
    public Optional<Extension> getExtension() {
        if (this.extension == null) {
            this.extension = Optional.of(new ExtensionAdapter(this.messageConsumer));
        }
        return this.extension;
    }

    @Override // io.openmessaging.extension.Extension
    public QueueMetaData getQueueMetaData(String str) {
        return getExtension().get().getQueueMetaData(str);
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public ConsumerIndex getIndex(short s) {
        FetchIndexData fetchIndex = this.messageConsumer.fetchIndex(s);
        return new ConsumerIndex(fetchIndex.getIndex(), fetchIndex.getLeftIndex(), fetchIndex.getRightIndex());
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void batchAck(List<MessageReceipt> list) {
        try {
            LinkedList newLinkedList = Lists.newLinkedList();
            for (MessageReceipt messageReceipt : list) {
                Preconditions.checkArgument(messageReceipt instanceof MessageReceiptAdapter, "receipt is not supported");
                ConsumeMessage message = ((MessageReceiptAdapter) messageReceipt).getMessage();
                newLinkedList.add(new ConsumeReply(message.getPartition(), message.getIndex(), RetryType.NONE));
            }
            this.messageConsumer.reply(newLinkedList);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void commitIndex(short s, long j) {
        try {
            this.messageConsumer.commitIndex(s, j);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void commitMaxIndex(short s) {
        try {
            this.messageConsumer.commitMaxIndex(s);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void commitMaxIndex() {
        try {
            this.messageConsumer.commitMaxIndex();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void commitMinIndex(short s) {
        try {
            this.messageConsumer.commitMinIndex(s);
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    @Override // io.openmessaging.joyqueue.consumer.ExtensionConsumer
    public void commitMinIndex() {
        try {
            this.messageConsumer.commitMinIndex();
        } catch (Throwable th) {
            throw handleConsumeException(th);
        }
    }

    protected OMSRuntimeException handleConsumeException(Throwable th) {
        throw ExceptionConverter.convertConsumeException(th);
    }
}
