package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.log4j.spi.Configurator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.DeadLetterProducerBuilderCustomizer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.metrics.Counter;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.3.jar:org/apache/pulsar/client/impl/ConsumerImpl.class */
public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    final long consumerId;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER;
    private volatile int availablePermits;
    protected volatile MessageId lastDequeuedMessageId;
    private volatile MessageId lastMessageIdInBroker;
    private final long lookupDeadline;
    private static final AtomicLongFieldUpdater<ConsumerImpl> SUBSCRIBE_DEADLINE_UPDATER;
    private volatile long subscribeDeadline;
    private final int partitionIndex;
    private final boolean hasParentConsumer;
    private final boolean parentConsumerHasListener;
    private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
    private final NegativeAcksTracker negativeAcksTracker;
    protected final ConsumerStatsRecorder stats;
    private final int priorityLevel;
    private final SubscriptionMode subscriptionMode;
    private volatile MessageIdAdv startMessageId;
    private volatile MessageIdAdv seekMessageId;

    @VisibleForTesting
    final AtomicReference<SeekStatus> seekStatus;
    private volatile CompletableFuture<Void> seekFuture;
    private final MessageIdAdv initialStartMessageId;
    private final long startMessageRollbackDurationInSec;
    private volatile boolean hasReachedEndOfTopic;
    private final MessageCrypto msgCrypto;
    private final Map<String, String> metadata;
    private final boolean readCompacted;
    private final boolean resetIncludeHead;
    private final SubscriptionInitialPosition subscriptionInitialPosition;
    private final ConnectionHandler connectionHandler;
    private final TopicName topicName;
    private final String topicNameWithoutPartition;
    private final Map<MessageIdAdv, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
    private final DeadLetterPolicy deadLetterPolicy;
    private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
    private volatile int deadLetterProducerFailureCount;
    private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
    private volatile int retryLetterProducerFailureCount;
    private final ReadWriteLock createProducerLock;
    protected volatile boolean paused;
    protected Map<String, ChunkedMessageCtx> chunkedMessagesMap;
    private int pendingChunkedMessageCount;
    protected long expireTimeOfIncompleteChunkedMessageMillis;
    private final AtomicBoolean expireChunkMessageTaskScheduled;
    private final int maxPendingChunkedMessage;
    private final boolean autoAckOldestChunkedMessageOnQueueFull;
    private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
    private final boolean createTopicIfDoesNotExist;
    private final boolean poolMessages;
    private final Counter messagesReceivedCounter;
    private final Counter bytesReceivedCounter;
    private final UpDownCounter messagesPrefetchedGauge;
    private final UpDownCounter bytesPrefetchedGauge;
    private final Counter consumersOpenedCounter;
    private final Counter consumersClosedCounter;
    private final Counter consumerAcksCounter;
    private final Counter consumerNacksCounter;
    private final Counter consumerDlqMessagesCounter;
    private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration;
    private final AtomicInteger previousExceptionCount;
    private volatile boolean hasSoughtByTimestamp;
    private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND;
    private static final Logger log;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.3.jar:org/apache/pulsar/client/impl/ConsumerImpl$ChunkedMessageCtx.class */
    public static class ChunkedMessageCtx {
        protected int totalChunks;
        protected ByteBuf chunkedMsgBuffer;
        protected int lastChunkedMessageId;
        protected MessageIdImpl[] chunkedMessageIds;
        protected long receivedTime;
        private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() { // from class: org.apache.pulsar.client.impl.ConsumerImpl.ChunkedMessageCtx.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
                return new ChunkedMessageCtx(handle);
            }
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        public static ChunkedMessageCtx get(int i, ByteBuf byteBuf) {
            ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get();
            chunkedMessageCtx.totalChunks = i;
            chunkedMessageCtx.chunkedMsgBuffer = byteBuf;
            chunkedMessageCtx.chunkedMessageIds = new MessageIdImpl[i];
            chunkedMessageCtx.receivedTime = System.currentTimeMillis();
            return chunkedMessageCtx;
        }

        private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> handle) {
            this.totalChunks = -1;
            this.lastChunkedMessageId = -1;
            this.receivedTime = 0L;
            this.recyclerHandle = handle;
        }

        public void recycle() {
            this.totalChunks = -1;
            this.chunkedMsgBuffer = null;
            this.lastChunkedMessageId = -1;
            this.recyclerHandle.recycle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.3.jar:org/apache/pulsar/client/impl/ConsumerImpl$DecryptResult.class */
    public static class DecryptResult {
        private final boolean success;
        private final ByteBuf payload;

        private DecryptResult(boolean z, ByteBuf byteBuf) {
            this.success = z;
            this.payload = byteBuf;
        }

        public boolean shouldDiscard() {
            return this.payload == null;
        }

        public static DecryptResult success(ByteBuf byteBuf) {
            return new DecryptResult(true, byteBuf);
        }

        public static DecryptResult failure(ByteBuf byteBuf) {
            return new DecryptResult(false, byteBuf);
        }

        public static DecryptResult discard() {
            return new DecryptResult(false, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.3.jar:org/apache/pulsar/client/impl/ConsumerImpl$GetLastMessageIdResponse.class */
    public static final class GetLastMessageIdResponse {
        final MessageId lastMessageId;
        final MessageId markDeletePosition;

        GetLastMessageIdResponse(MessageId messageId, MessageId messageId2) {
            this.lastMessageId = messageId;
            this.markDeletePosition = messageId2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.3.jar:org/apache/pulsar/client/impl/ConsumerImpl$SeekStatus.class */
    public enum SeekStatus {
        NOT_STARTED,
        IN_PROGRESS,
        COMPLETED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z2) {
        return newConsumerImpl(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, false, completableFuture, messageId, schema, consumerInterceptors, z2, 0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, boolean z2, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z3, long j) {
        return consumerConfigurationData.getReceiverQueueSize() == 0 ? new ZeroQueueConsumerImpl(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, completableFuture, messageId, schema, consumerInterceptors, z3) : new ConsumerImpl<>(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, z2, completableFuture, messageId, j, schema, consumerInterceptors, z3);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, boolean z2, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, long j, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z3) {
        super(pulsarClientImpl, str, consumerConfigurationData, consumerConfigurationData.getReceiverQueueSize(), executorProvider, completableFuture, schema, consumerInterceptors);
        MessageCryptoBc messageCryptoBc;
        this.availablePermits = 0;
        this.lastDequeuedMessageId = MessageId.earliest;
        this.lastMessageIdInBroker = MessageId.earliest;
        this.subscribeDeadline = 0L;
        this.createProducerLock = new ReentrantReadWriteLock();
        this.chunkedMessagesMap = new ConcurrentHashMap();
        this.pendingChunkedMessageCount = 0;
        this.expireTimeOfIncompleteChunkedMessageMillis = 0L;
        this.expireChunkMessageTaskScheduled = new AtomicBoolean(false);
        this.clientCnxUsedForConsumerRegistration = new AtomicReference<>();
        this.previousExceptionCount = new AtomicInteger();
        this.hasSoughtByTimestamp = false;
        this.consumerId = pulsarClientImpl.newConsumerId();
        TopicName topicName = TopicName.get(str);
        if (!topicName.isPersistent() && consumerConfigurationData.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
            consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
            log.warn("[{}] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: {}", str, consumerConfigurationData.getSubscriptionName());
        }
        this.subscriptionMode = consumerConfigurationData.getSubscriptionMode();
        if (messageId != null) {
            MessageIdAdv firstChunkMessageId = ((MessageIdAdv) messageId).getFirstChunkMessageId();
            if (!consumerConfigurationData.isResetIncludeHead() || firstChunkMessageId == null) {
                this.startMessageId = (MessageIdAdv) messageId;
            } else {
                this.startMessageId = firstChunkMessageId;
            }
        }
        this.initialStartMessageId = this.startMessageId;
        this.startMessageRollbackDurationInSec = j;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.lookupDeadline = System.currentTimeMillis() + pulsarClientImpl.getConfiguration().getLookupTimeoutMs();
        this.partitionIndex = i;
        this.hasParentConsumer = z;
        this.parentConsumerHasListener = z2;
        this.priorityLevel = consumerConfigurationData.getMatchingTopicConfiguration(str).getPriorityLevel();
        this.readCompacted = consumerConfigurationData.isReadCompacted();
        this.subscriptionInitialPosition = consumerConfigurationData.getSubscriptionInitialPosition();
        this.negativeAcksTracker = new NegativeAcksTracker(this, consumerConfigurationData);
        this.resetIncludeHead = consumerConfigurationData.isResetIncludeHead();
        this.createTopicIfDoesNotExist = z3;
        this.maxPendingChunkedMessage = consumerConfigurationData.getMaxPendingChunkedMessage();
        this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue();
        this.expireTimeOfIncompleteChunkedMessageMillis = consumerConfigurationData.getExpireTimeOfIncompleteChunkedMessageMillis();
        this.autoAckOldestChunkedMessageOnQueueFull = consumerConfigurationData.isAutoAckOldestChunkedMessageOnQueueFull();
        this.poolMessages = consumerConfigurationData.isPoolMessages();
        this.paused = consumerConfigurationData.isStartPaused();
        if (pulsarClientImpl.getConfiguration().getStatsIntervalSeconds() > 0) {
            this.stats = new ConsumerStatsRecorderImpl(pulsarClientImpl, consumerConfigurationData, this);
        } else {
            this.stats = ConsumerStatsDisabled.INSTANCE;
        }
        this.seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);
        if (consumerConfigurationData.getCryptoKeyReader() == null) {
            this.msgCrypto = null;
        } else if (consumerConfigurationData.getMessageCrypto() != null) {
            this.msgCrypto = consumerConfigurationData.getMessageCrypto();
        } else {
            try {
                messageCryptoBc = new MessageCryptoBc(String.format("[%s] [%s]", str, this.subscription), false);
            } catch (Exception e) {
                log.error("MessageCryptoBc may not included in the jar. e:", (Throwable) e);
                messageCryptoBc = null;
            }
            this.msgCrypto = messageCryptoBc;
        }
        if (consumerConfigurationData.getProperties().isEmpty()) {
            this.metadata = Collections.emptyMap();
        } else {
            this.metadata = Collections.unmodifiableMap(new HashMap(consumerConfigurationData.getProperties()));
        }
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(pulsarClientImpl.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClientImpl.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create(), this);
        this.topicName = TopicName.get(str);
        if (this.topicName.isPersistent()) {
            this.acknowledgmentsGroupingTracker = new PersistentAcknowledgmentsGroupingTracker(this, consumerConfigurationData, pulsarClientImpl.eventLoopGroup());
        } else {
            this.acknowledgmentsGroupingTracker = NonPersistentAcknowledgmentGroupingTracker.of();
        }
        if (consumerConfigurationData.getDeadLetterPolicy() != null) {
            this.possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap();
            if (StringUtils.isNotBlank(consumerConfigurationData.getDeadLetterPolicy().getDeadLetterTopic())) {
                this.deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(consumerConfigurationData.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(consumerConfigurationData.getDeadLetterPolicy().getDeadLetterTopic()).build();
            } else {
                this.deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(consumerConfigurationData.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(RetryMessageUtil.getDLQTopic(str, this.subscription)).build();
            }
            if (StringUtils.isNotBlank(consumerConfigurationData.getDeadLetterPolicy().getRetryLetterTopic())) {
                this.deadLetterPolicy.setRetryLetterTopic(consumerConfigurationData.getDeadLetterPolicy().getRetryLetterTopic());
            } else {
                this.deadLetterPolicy.setRetryLetterTopic(RetryMessageUtil.getRetryTopic(str, this.subscription));
            }
            if (StringUtils.isNotBlank(consumerConfigurationData.getDeadLetterPolicy().getInitialSubscriptionName())) {
                this.deadLetterPolicy.setInitialSubscriptionName(consumerConfigurationData.getDeadLetterPolicy().getInitialSubscriptionName());
            }
            this.deadLetterPolicy.setRetryLetterProducerBuilderCustomizer(consumerConfigurationData.getDeadLetterPolicy().getRetryLetterProducerBuilderCustomizer());
            this.deadLetterPolicy.setDeadLetterProducerBuilderCustomizer(consumerConfigurationData.getDeadLetterPolicy().getDeadLetterProducerBuilderCustomizer());
        } else {
            this.deadLetterPolicy = null;
            this.possibleSendToDeadLetterTopicMessages = null;
        }
        this.topicNameWithoutPartition = topicName.getPartitionedTopicName();
        InstrumentProvider instrumentProvider = pulsarClientImpl.instrumentProvider();
        Attributes build = Attributes.builder().put("pulsar.subscription", this.subscription).build();
        this.consumersOpenedCounter = instrumentProvider.newCounter("pulsar.client.consumer.opened", Unit.Sessions, "The number of consumer sessions opened", str, build);
        this.consumersClosedCounter = instrumentProvider.newCounter("pulsar.client.consumer.closed", Unit.Sessions, "The number of consumer sessions closed", str, build);
        this.messagesReceivedCounter = instrumentProvider.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages, "The number of messages explicitly received by the consumer application", str, build);
        this.bytesReceivedCounter = instrumentProvider.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes, "The number of bytes explicitly received by the consumer application", str, build);
        this.messagesPrefetchedGauge = instrumentProvider.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages, "The number of messages currently sitting in the consumer receive queue", str, build);
        this.bytesPrefetchedGauge = instrumentProvider.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes, "The total size in bytes of messages currently sitting in the consumer receive queue", str, build);
        this.consumerAcksCounter = instrumentProvider.newCounter("pulsar.client.consumer.message.ack", Unit.Messages, "The number of acknowledged messages", str, build);
        this.consumerNacksCounter = instrumentProvider.newCounter("pulsar.client.consumer.message.nack", Unit.Messages, "The number of negatively acknowledged messages", str, build);
        this.consumerDlqMessagesCounter = instrumentProvider.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, "The number of messages sent to DLQ", str, build);
        grabCnx();
        this.consumersOpenedCounter.increment();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    @VisibleForTesting
    NegativeAcksTracker getNegativeAcksTracker() {
        return this.negativeAcksTracker;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> unsubscribeAsync(boolean z) {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isConnected()) {
            setState(HandlerState.State.Closing);
            long newRequestId = this.client.newRequestId();
            cnx().sendRequestWithId(Commands.newUnsubscribe(this.consumerId, newRequestId, z), newRequestId).thenRun(() -> {
                closeConsumerTasks();
                deregisterFromClientCnx();
                this.client.cleanupConsumer(this);
                log.info("[{}][{}] Successfully unsubscribed from topic", this.topic, this.subscription);
                setState(HandlerState.State.Closed);
                completableFuture.complete(null);
            }).exceptionally(th -> {
                log.error("[{}][{}] Failed to unsubscribe: {}", this.topic, this.subscription, th.getCause().getMessage());
                setState(HandlerState.State.Ready);
                completableFuture.completeExceptionally(PulsarClientException.wrap(th.getCause(), String.format("Failed to unsubscribe the subscription %s of topic %s", this.subscription, this.topicName.toString())));
                return null;
            });
        } else {
            completableFuture.completeExceptionally(new PulsarClientException.NotConnectedException(String.format("The client is not connected to the broker when unsubscribing the subscription %s of the topic %s", this.subscription, this.topicName.toString())));
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int minReceiverQueueSize() {
        int min = Math.min(1, this.maxReceiverQueueSize);
        if (this.batchReceivePolicy.getMaxNumMessages() > 0) {
            min = Math.max(min, (2 * this.batchReceivePolicy.getMaxNumMessages()) - 2);
        }
        return min;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message<T> internalReceive() throws PulsarClientException {
        try {
            if (this.incomingMessages.isEmpty()) {
                expectMoreIncomingMessages();
            }
            Message<T> take = this.incomingMessages.take();
            messageProcessed(take);
            return beforeConsume(take);
        } catch (InterruptedException e) {
            ExceptionHandler.handleInterruptedException(e);
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFutureCancellationHandler completableFutureCancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Message<T>> createFuture = completableFutureCancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            Message<T> poll = this.incomingMessages.poll();
            if (poll != null) {
                messageProcessed(poll);
                createFuture.complete(beforeConsume(poll));
            } else {
                expectMoreIncomingMessages();
                this.pendingReceives.add(createFuture);
                completableFutureCancellationHandler.setCancelAction(() -> {
                    this.pendingReceives.remove(createFuture);
                });
            }
        });
        return createFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message<T> internalReceive(long j, TimeUnit timeUnit) throws PulsarClientException {
        try {
            if (this.incomingMessages.isEmpty()) {
                expectMoreIncomingMessages();
            }
            Message<T> poll = this.incomingMessages.poll(j, timeUnit);
            if (poll == null) {
                return null;
            }
            messageProcessed(poll);
            return this.listener == null ? beforeConsume(poll) : poll;
        } catch (InterruptedException e) {
            ExceptionHandler.handleInterruptedException(e);
            HandlerState.State state = getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                return null;
            }
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Messages<T> internalBatchReceive() throws PulsarClientException {
        try {
            return internalBatchReceiveAsync().get();
        } catch (InterruptedException | ExecutionException e) {
            ExceptionHandler.handleInterruptedException(e);
            HandlerState.State state = getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                return null;
            }
            this.stats.incrementNumBatchReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
        CompletableFutureCancellationHandler completableFutureCancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Messages<T>> createFuture = completableFutureCancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            if (hasEnoughMessagesForBatchReceive()) {
                notifyPendingBatchReceivedCallBack(createFuture);
                return;
            }
            expectMoreIncomingMessages();
            ConsumerBase.OpBatchReceive<T> of = ConsumerBase.OpBatchReceive.of(createFuture);
            this.pendingBatchReceives.add(of);
            triggerBatchReceiveTimeoutTask();
            completableFutureCancellationHandler.setCancelAction(() -> {
                this.pendingBatchReceives.remove(of);
            });
        });
        return createFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        this.consumerAcksCounter.increment();
        if (getState() == HandlerState.State.Ready || getState() == HandlerState.State.Connecting) {
            return transactionImpl != null ? doTransactionAcknowledgeForResponse(messageId, ackType, null, map, new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())) : this.acknowledgmentsGroupingTracker.addAcknowledgment(messageId, ackType, map);
        }
        this.stats.incrementNumAcksFailed();
        PulsarClientException pulsarClientException = new PulsarClientException("Consumer not ready. State: " + getState());
        if (CommandAck.AckType.Individual.equals(ackType)) {
            onAcknowledge(messageId, pulsarClientException);
        } else if (CommandAck.AckType.Cumulative.equals(ackType)) {
            onAcknowledgeCumulative(messageId, pulsarClientException);
        }
        return FutureUtil.failedFuture(pulsarClientException);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Void> doAcknowledge(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        this.consumerAcksCounter.increment();
        if (getState() == HandlerState.State.Ready || getState() == HandlerState.State.Connecting) {
            return transactionImpl != null ? doTransactionAcknowledgeForResponse(list, ackType, map, new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())) : this.acknowledgmentsGroupingTracker.addListAcknowledgment(list, ackType, map);
        }
        this.stats.incrementNumAcksFailed();
        PulsarClientException pulsarClientException = new PulsarClientException("Consumer not ready. State: " + getState());
        if (CommandAck.AckType.Individual.equals(ackType)) {
            onAcknowledge(list, pulsarClientException);
        } else if (CommandAck.AckType.Cumulative.equals(ackType)) {
            onAcknowledgeCumulative(list, pulsarClientException);
        }
        return FutureUtil.failedFuture(pulsarClientException);
    }

    private static void copyMessageKeysIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilder) {
        if (message.hasKey()) {
            if (message.hasBase64EncodedKey()) {
                typedMessageBuilder.keyBytes(message.getKeyBytes());
            } else {
                typedMessageBuilder.key(message.getKey());
            }
        }
        if (message.hasOrderingKey()) {
            typedMessageBuilder.orderingKey(message.getOrderingKey());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Void> doReconsumeLater(Message<?> message, CommandAck.AckType ackType, Map<String, String> map, long j, TimeUnit timeUnit) {
        MessageId messageId = message.getMessageId();
        if (messageId == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId"));
        }
        if (getState() != HandlerState.State.Ready && getState() != HandlerState.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            PulsarClientException pulsarClientException = new PulsarClientException("Consumer not ready. State: " + getState());
            if (CommandAck.AckType.Individual.equals(ackType)) {
                onAcknowledge(messageId, pulsarClientException);
            } else if (CommandAck.AckType.Cumulative.equals(ackType)) {
                onAcknowledgeCumulative(messageId, pulsarClientException);
            }
            return FutureUtil.failedFuture(pulsarClientException);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (initRetryLetterProducerIfNeeded() != null) {
            try {
                MessageImpl<?> messageImpl = getMessageImpl(message);
                SortedMap<String, String> propertiesMap = getPropertiesMap(message, message.getMessageId().toString(), getOriginTopicNameStr(message));
                if (map != null) {
                    propertiesMap.putAll(map);
                }
                int i = 1;
                if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                    i = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) + 1;
                }
                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(i));
                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(timeUnit.toMillis(j < 0 ? 0L : j)));
                if (i > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(this.deadLetterPolicy.getDeadLetterTopic())) {
                    initDeadLetterProducerIfNeeded().thenAcceptAsync(producer -> {
                        try {
                            TypedMessageBuilder<T> properties = producer.newMessage(Schema.AUTO_PRODUCE_BYTES(messageImpl.getReaderSchema().get())).value(messageImpl.getData()).properties(propertiesMap);
                            copyMessageKeysIfNeeded(message, properties);
                            copyMessageEventTime(message, properties);
                            properties.sendAsync().thenAccept(messageId2 -> {
                                this.consumerDlqMessagesCounter.increment();
                                doAcknowledge(messageId, ackType, Collections.emptyMap(), (TransactionImpl) null).thenAccept(r4 -> {
                                    completableFuture.complete(null);
                                }).exceptionally(th -> {
                                    completableFuture.completeExceptionally(th);
                                    return null;
                                });
                            }).exceptionally(th -> {
                                completableFuture.completeExceptionally(th);
                                return null;
                            });
                        } catch (Exception e) {
                            completableFuture.completeExceptionally(e);
                        }
                    }, (Executor) this.internalPinnedExecutor).exceptionally(th -> {
                        completableFuture.completeExceptionally(th);
                        return null;
                    });
                } else {
                    if (!$assertionsDisabled && messageImpl == null) {
                        throw new AssertionError();
                    }
                    initRetryLetterProducerIfNeeded().thenAcceptAsync(producer2 -> {
                        try {
                            TypedMessageBuilder<T> properties = producer2.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).value(messageImpl.getData()).properties(propertiesMap);
                            if (j > 0) {
                                properties.deliverAfter(j, timeUnit);
                            }
                            copyMessageKeysIfNeeded(message, properties);
                            copyMessageEventTime(message, properties);
                            properties.sendAsync().thenCompose(messageId2 -> {
                                return doAcknowledge(messageId, ackType, Collections.emptyMap(), (TransactionImpl) null);
                            }).thenAccept((java.util.function.Consumer<? super U>) r4 -> {
                                completableFuture.complete(null);
                            }).exceptionally(th2 -> {
                                completableFuture.completeExceptionally(th2);
                                return null;
                            });
                        } catch (Exception e) {
                            completableFuture.completeExceptionally(e);
                        }
                    }, (Executor) this.internalPinnedExecutor).exceptionally(th2 -> {
                        completableFuture.completeExceptionally(th2);
                        return null;
                    });
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        } else {
            completableFuture.completeExceptionally(new PulsarClientException("Retry letter producer is null."));
        }
        completableFuture.exceptionally(th3 -> {
            log.error("Send to retry letter topic exception with topic: {}, messageId: {}", this.deadLetterPolicy.getRetryLetterTopic(), messageId, th3);
            Set<MessageId> singleton = Collections.singleton(messageId);
            this.unAckedMessageTracker.remove(messageId);
            redeliverUnacknowledgedMessages(singleton);
            return null;
        });
        return completableFuture;
    }

    private SortedMap<String, String> getPropertiesMap(Message<?> message, String str, String str2) {
        TreeMap treeMap = new TreeMap();
        if (message.getProperties() != null) {
            treeMap.putAll(message.getProperties());
        }
        treeMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, str2);
        treeMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, str);
        treeMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, str);
        treeMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION, this.subscription);
        return treeMap;
    }

    private String getOriginTopicNameStr(Message<?> message) {
        MessageId messageId = message.getMessageId();
        if (!(messageId instanceof TopicMessageId)) {
            return message.getTopicName();
        }
        String ownerTopic = ((TopicMessageId) messageId).getOwnerTopic();
        int lastIndexOf = ownerTopic.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
        return lastIndexOf < 0 ? ownerTopic : ownerTopic.substring(0, lastIndexOf);
    }

    private MessageImpl<?> getMessageImpl(Message<?> message) {
        if (message instanceof TopicMessageImpl) {
            return (MessageImpl) ((TopicMessageImpl) message).getMessage();
        }
        if (message instanceof MessageImpl) {
            return (MessageImpl) message;
        }
        return null;
    }

    private static void copyMessageEventTime(Message<?> message, TypedMessageBuilder<byte[]> typedMessageBuilder) {
        typedMessageBuilder.eventTime(message.getEventTime());
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void negativeAcknowledge(MessageId messageId) {
        this.consumerNacksCounter.increment();
        this.negativeAcksTracker.add(messageId);
        this.unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public void negativeAcknowledge(Message<?> message) {
        this.consumerNacksCounter.increment();
        this.negativeAcksTracker.add(message);
        this.unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public CompletableFuture<Void> connectionOpened(ClientCnx clientCnx) {
        int size;
        this.previousExceptionCount.set(0);
        getConnectionHandler().setMaxMessageSize(clientCnx.getMaxMessageSize());
        HandlerState.State state = getState();
        if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
            setState(HandlerState.State.Closed);
            closeConsumerTasks();
            deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            clearReceiverQueue(false);
            return CompletableFuture.completedFuture(null);
        }
        log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}", this.topic, this.subscription, clientCnx.ctx().channel(), Long.valueOf(this.consumerId));
        long newRequestId = this.client.newRequestId();
        if (this.seekStatus.get() != SeekStatus.NOT_STARTED) {
            this.acknowledgmentsGroupingTracker.flushAndClean();
        }
        SUBSCRIBE_DEADLINE_UPDATER.compareAndSet(this, 0L, System.currentTimeMillis() + this.client.getConfiguration().getOperationTimeoutMs());
        synchronized (this) {
            size = this.incomingMessages.size();
            setClientCnx(clientCnx);
            clearReceiverQueue(true);
            if (this.possibleSendToDeadLetterTopicMessages != null) {
                this.possibleSendToDeadLetterTopicMessages.clear();
            }
        }
        boolean z = this.subscriptionMode == SubscriptionMode.Durable;
        MessageIdData batchIndex = (z || this.startMessageId == null) ? null : new MessageIdData().setLedgerId(this.startMessageId.getLedgerId()).setEntryId(this.startMessageId.getEntryId()).setBatchIndex(this.startMessageId.getBatchIndex());
        SchemaInfo schemaInfo = this.schema.getSchemaInfo();
        if (schemaInfo != null && (SchemaType.BYTES == schemaInfo.getType() || SchemaType.NONE == schemaInfo.getType())) {
            schemaInfo = null;
        } else if ((this.schema instanceof AutoConsumeSchema) && Commands.peerSupportsCarryAutoConsumeSchemaToBroker(clientCnx.getRemoteEndpointProtocolVersion())) {
            schemaInfo = AutoConsumeSchema.SCHEMA_INFO;
        }
        long j = (this.startMessageRollbackDurationInSec <= 0 || this.startMessageId == null || !this.startMessageId.equals(this.initialStartMessageId)) ? 0L : this.startMessageRollbackDurationInSec;
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this) {
            clientCnx.sendRequestWithId(Commands.newSubscribe(this.topic, this.subscription, this.consumerId, newRequestId, getSubType(), this.priorityLevel, this.consumerName, z, batchIndex, this.metadata, this.readCompacted, this.conf.getReplicateSubscriptionState(), CommandSubscribe.InitialPosition.valueOf(this.subscriptionInitialPosition.getValue()), j, schemaInfo, this.createTopicIfDoesNotExist, this.conf.getKeySharedPolicy(), this.conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)), newRequestId).thenRun(() -> {
                synchronized (this) {
                    if (!changeToReadyState()) {
                        setState(HandlerState.State.Closed);
                        deregisterFromClientCnx();
                        this.client.cleanupConsumer(this);
                        clientCnx.channel().close();
                        completableFuture.complete(null);
                        return;
                    }
                    consumerIsReconnectedToBroker(clientCnx, size);
                    resetBackoff();
                    if ((!this.subscribeFuture.complete(this) || !this.hasParentConsumer) && getCurrentReceiverQueueSize() != 0) {
                        increaseAvailablePermits(clientCnx, getCurrentReceiverQueueSize());
                    }
                    completableFuture.complete(null);
                }
            }).exceptionally(th -> {
                deregisterFromClientCnx();
                if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                    clientCnx.channel().close();
                    completableFuture.complete(null);
                    return null;
                }
                log.warn("[{}][{}] Failed to subscribe to topic on {}", this.topic, this.subscription, clientCnx.channel().remoteAddress());
                if (th.getCause() instanceof PulsarClientException.TimeoutException) {
                    long newRequestId2 = this.client.newRequestId();
                    clientCnx.sendRequestWithId(Commands.newCloseConsumer(this.consumerId, newRequestId2, null, null), newRequestId2);
                }
                if ((th.getCause() instanceof PulsarClientException) && PulsarClientException.isRetriableError(th.getCause()) && !isUnrecoverableError(th.getCause()) && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(this)) {
                    completableFuture.completeExceptionally(th.getCause());
                } else if (!this.subscribeFuture.isDone()) {
                    setState(HandlerState.State.Failed);
                    closeConsumerTasks();
                    this.subscribeFuture.completeExceptionally(PulsarClientException.wrap(th, String.format("Failed to subscribe the topic %s with subscription name %s when connecting to the broker", this.topicName.toString(), this.subscription)));
                    this.client.cleanupConsumer(this);
                } else if (isUnrecoverableError(th.getCause())) {
                    closeWhenReceivedUnrecoverableError(th.getCause(), clientCnx);
                } else {
                    completableFuture.completeExceptionally(th.getCause());
                }
                if (completableFuture.isDone()) {
                    return null;
                }
                completableFuture.complete(null);
                return null;
            });
        }
        return completableFuture;
    }

    protected boolean isUnrecoverableError(Throwable th) {
        return (th instanceof PulsarClientException.TopicDoesNotExistException) || (th instanceof IllegalStateException) || (th instanceof PulsarClientException.NotFoundException);
    }

    protected void closeWhenReceivedUnrecoverableError(Throwable th, ClientCnx clientCnx) {
        String valueOf = clientCnx == null ? Configurator.NULL : String.valueOf(clientCnx.channel().remoteAddress());
        log.warn("[{}][{}] {} Closed consumer because get an error that does not support to retry: {} {}", this.topic, this.subscription, valueOf, th.getClass().getName(), th.getMessage());
        closeAsync().whenComplete((r10, th2) -> {
            if (th2 == null) {
                setState(HandlerState.State.Failed);
            } else {
                log.error("[{}][{}] {} Failed to close consumer after got an error that does not support to retry: {} {}", this.topic, this.subscription, valueOf, th.getClass().getName(), th.getMessage());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumerIsReconnectedToBroker(ClientCnx clientCnx, int i) {
        log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", this.topic, this.subscription, clientCnx.channel().remoteAddress(), Long.valueOf(this.consumerId));
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
    }

    private void clearReceiverQueue(boolean z) {
        ArrayList arrayList = new ArrayList(this.incomingMessages.size());
        this.incomingMessages.drainTo(arrayList);
        resetIncomingMessageSize();
        CompletableFuture<Void> completableFuture = this.seekFuture;
        MessageIdAdv messageIdAdv = this.seekMessageId;
        if (this.seekStatus.get() != SeekStatus.NOT_STARTED) {
            if (z) {
                this.startMessageId = messageIdAdv;
            }
            if (this.seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) {
                this.internalPinnedExecutor.execute(() -> {
                    completableFuture.complete(null);
                });
                return;
            }
            return;
        }
        if (this.subscriptionMode == SubscriptionMode.Durable) {
            return;
        }
        if (arrayList.isEmpty()) {
            if (!z || this.lastDequeuedMessageId.equals(MessageId.earliest)) {
                return;
            }
            this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) this.lastDequeuedMessageId);
            return;
        }
        MessageIdAdv messageIdAdv2 = (MessageIdAdv) ((Message) arrayList.get(0)).getMessageId();
        MessageIdAdv batchMessageIdImpl = MessageIdAdvUtils.isBatch(messageIdAdv2) ? new BatchMessageIdImpl(messageIdAdv2.getLedgerId(), messageIdAdv2.getEntryId(), messageIdAdv2.getPartitionIndex(), messageIdAdv2.getBatchIndex() - 1) : MessageIdAdvUtils.prevMessageId(messageIdAdv2);
        arrayList.forEach((v0) -> {
            v0.release();
        });
        if (z) {
            this.startMessageId = batchMessageIdImpl;
        }
    }

    private void sendFlowPermitsToBroker(ClientCnx clientCnx, int i) {
        if (clientCnx == null || i <= 0) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Adding {} additional permits", this.topic, this.subscription, Integer.valueOf(i));
        }
        if (log.isDebugEnabled()) {
            clientCnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, i)).addListener2(future -> {
                if (future.isSuccess()) {
                    log.debug("Consumer {} sent {} permits to broker", Long.valueOf(this.consumerId), Integer.valueOf(i));
                } else {
                    log.debug("Consumer {} failed to send {} permits to broker: {}", Long.valueOf(this.consumerId), Integer.valueOf(i), future.cause().getMessage());
                }
            });
        } else {
            clientCnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, i), clientCnx.ctx().voidPromise());
        }
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public boolean connectionFailed(PulsarClientException pulsarClientException) {
        boolean z = !PulsarClientException.isRetriableError(pulsarClientException);
        boolean z2 = System.currentTimeMillis() > this.lookupDeadline;
        if (!z && !z2) {
            this.previousExceptionCount.incrementAndGet();
            return true;
        }
        pulsarClientException.setPreviousExceptionCount(this.previousExceptionCount);
        if (!this.subscribeFuture.completeExceptionally(pulsarClientException)) {
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(pulsarClientException);
            if (!isUnrecoverableError(unwrapCompletionException)) {
                return true;
            }
            closeWhenReceivedUnrecoverableError(unwrapCompletionException, null);
            return false;
        }
        setState(HandlerState.State.Failed);
        if (z) {
            log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", this.topic, Long.valueOf(this.consumerId), pulsarClientException.getMessage());
        } else {
            log.info("[{}] Consumer creation failed for consumer {} after timeout", this.topic, Long.valueOf(this.consumerId));
        }
        closeConsumerTasks();
        deregisterFromClientCnx();
        this.client.cleanupConsumer(this);
        return false;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public synchronized CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ArrayList arrayList = new ArrayList(4);
        arrayList.add(completableFuture);
        if (this.retryLetterProducer != null) {
            arrayList.add(this.retryLetterProducer.thenCompose(producer -> {
                return producer.closeAsync();
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r4, th) -> {
                if (th != null) {
                    log.warn("Exception ignored in closing retryLetterProducer of consumer", th);
                }
            }));
        }
        if (this.deadLetterProducer != null) {
            arrayList.add(this.deadLetterProducer.thenCompose(producer2 -> {
                return producer2.closeAsync();
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r42, th2) -> {
                if (th2 != null) {
                    log.warn("Exception ignored in closing deadLetterProducer of consumer", th2);
                }
            }));
        }
        CompletableFuture<Void> waitForAll = FutureUtil.waitForAll(arrayList);
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            closeConsumerTasks();
            failPendingReceive().whenComplete((r43, th3) -> {
                completableFuture.complete(null);
            });
            return waitForAll;
        }
        this.consumersClosedCounter.increment();
        if (!isConnected()) {
            log.info("[{}] [{}] Closed Consumer (not connected)", this.topic, this.subscription);
            setState(HandlerState.State.Closed);
            closeConsumerTasks();
            deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            failPendingReceive().whenComplete((r44, th4) -> {
                completableFuture.complete(null);
            });
            return waitForAll;
        }
        this.stats.getStatTimeout().ifPresent((v0) -> {
            v0.cancel();
        });
        setState(HandlerState.State.Closing);
        closeConsumerTasks();
        long newRequestId = this.client.newRequestId();
        ClientCnx cnx = cnx();
        if (null == cnx) {
            cleanupAtClose(completableFuture, null);
        } else {
            cnx.sendRequestWithId(Commands.newCloseConsumer(this.consumerId, newRequestId, null, null), newRequestId).handle((producerResponse, th5) -> {
                ChannelHandlerContext ctx = cnx.ctx();
                boolean z = ctx == null || !ctx.channel().isActive();
                if (z && th5 != null) {
                    log.debug("Exception ignored in closing consumer", th5);
                }
                cleanupAtClose(completableFuture, z ? null : th5);
                return null;
            });
        }
        return waitForAll;
    }

    private void cleanupAtClose(CompletableFuture<Void> completableFuture, Throwable th) {
        log.info("[{}] [{}] Closed consumer", this.topic, this.subscription);
        setState(HandlerState.State.Closed);
        closeConsumerTasks();
        deregisterFromClientCnx();
        this.client.cleanupConsumer(this);
        failPendingReceive().whenComplete((r5, th2) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(null);
            }
        });
    }

    private void closeConsumerTasks() {
        this.unAckedMessageTracker.close();
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            this.possibleSendToDeadLetterTopicMessages.clear();
        }
        this.acknowledgmentsGroupingTracker.close();
        if (this.batchReceiveTimeout != null) {
            this.batchReceiveTimeout.cancel();
        }
        this.negativeAcksTracker.close();
        this.stats.getStatTimeout().ifPresent((v0) -> {
            v0.cancel();
        });
        if (this.poolMessages) {
            releasePooledMessagesAndStopAcceptNew();
        }
    }

    private void releasePooledMessagesAndStopAcceptNew() {
        this.incomingMessages.terminate(message -> {
            message.release();
        });
        clearIncomingMessages();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activeConsumerChanged(boolean z) {
        if (this.consumerEventListener == null) {
            return;
        }
        this.externalPinnedExecutor.execute(() -> {
            if (z) {
                this.consumerEventListener.becameActive(this, this.partitionIndex);
            } else {
                this.consumerEventListener.becameInactive(this, this.partitionIndex);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isBatch(MessageMetadata messageMetadata) {
        return !isMessageUndecryptable(messageMetadata) && (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <V> MessageImpl<V> newSingleMessage(int i, int i2, BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf byteBuf, MessageIdImpl messageIdImpl, Schema<V> schema, boolean z, BitSetRecyclable bitSetRecyclable, BitSet bitSet, int i3, long j) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] processing message num - {} in batch", this.subscription, this.consumerName, Integer.valueOf(i));
        }
        ByteBuf byteBuf2 = null;
        try {
            if (z) {
                try {
                    byteBuf2 = Commands.deSerializeSingleMessageInBatch(byteBuf, singleMessageMetadata, i, i2);
                } catch (IOException | IllegalStateException e) {
                    throw new IllegalStateException(e);
                }
            }
            if (this.topicName.isPersistent() && isSameEntry(messageIdImpl) && isPriorBatchIndex(i)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", this.subscription, this.consumerName, this.startMessageId);
                }
                if (byteBuf2 != null) {
                    byteBuf2.release();
                }
                return null;
            }
            if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
                if (byteBuf2 != null) {
                    byteBuf2.release();
                }
                return null;
            }
            if (isSingleMessageAcked(bitSetRecyclable, i)) {
                if (byteBuf2 != null) {
                    byteBuf2.release();
                }
                return null;
            }
            MessageImpl<V> create = MessageImpl.create(this.topicName.toString(), new BatchMessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex(), i, i2, bitSet), messageMetadata, singleMessageMetadata, byteBuf2 != null ? byteBuf2 : byteBuf, createEncryptionContext(messageMetadata), cnx(), schema, i3, this.poolMessages, j);
            create.setBrokerEntryMetadata(brokerEntryMetadata);
            if (byteBuf2 != null) {
                byteBuf2.release();
            }
            return create;
        } catch (Throwable th) {
            if (0 != 0) {
                byteBuf2.release();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <V> MessageImpl<V> newMessage(MessageIdImpl messageIdImpl, BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, ByteBuf byteBuf, Schema<V> schema, int i, long j) {
        MessageImpl<V> create = MessageImpl.create(this.topicName.toString(), messageIdImpl, messageMetadata, byteBuf, createEncryptionContext(messageMetadata), cnx(), schema, i, this.poolMessages, j);
        create.setBrokerEntryMetadata(brokerEntryMetadata);
        return create;
    }

    private void executeNotifyCallback(MessageImpl<T> messageImpl) {
        this.messagesPrefetchedGauge.increment();
        this.bytesPrefetchedGauge.add(messageImpl.size());
        this.internalPinnedExecutor.execute(() -> {
            if (!isValidConsumerEpoch(messageImpl)) {
                increaseAvailablePermits(cnx());
                return;
            }
            Message<T> onArrival = onArrival(messageImpl);
            if (hasNextPendingReceive()) {
                notifyPendingReceivedCallback(onArrival, null);
            } else if (enqueueMessageAndCheckBatchReceive(onArrival) && hasPendingBatchReceive()) {
                notifyPendingBatchReceivedCallBack();
            }
        });
    }

    private void processPayloadByProcessor(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, ByteBuf byteBuf, MessageIdImpl messageIdImpl, Schema<T> schema, int i, List<Long> list, long j) {
        MessagePayloadImpl create = MessagePayloadImpl.create(byteBuf);
        MessagePayloadContextImpl messagePayloadContextImpl = MessagePayloadContextImpl.get(brokerEntryMetadata, messageMetadata, messageIdImpl, this, i, list, j);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            try {
                this.conf.getPayloadProcessor().process(create, messagePayloadContextImpl, schema, message -> {
                    if (message != null) {
                        executeNotifyCallback((MessageImpl) message);
                    } else {
                        atomicInteger.incrementAndGet();
                    }
                });
                messagePayloadContextImpl.recycle();
                create.release();
            } catch (Throwable th) {
                log.warn("[{}] [{}] unable to obtain message in batch", this.subscription, this.consumerName, th);
                discardCorruptedMessage(messageIdImpl, cnx(), CommandAck.ValidationError.BatchDeSerializeError);
                messagePayloadContextImpl.recycle();
                create.release();
            }
            if (atomicInteger.get() > 0) {
                increaseAvailablePermits(cnx(), atomicInteger.get());
            }
            tryTriggerListener();
        } catch (Throwable th2) {
            messagePayloadContextImpl.recycle();
            create.release();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void messageReceived(CommandMessage commandMessage, ByteBuf byteBuf, ClientCnx clientCnx) {
        List emptyList = Collections.emptyList();
        if (commandMessage.getAckSetsCount() > 0) {
            emptyList = new ArrayList(commandMessage.getAckSetsCount());
            for (int i = 0; i < commandMessage.getAckSetsCount(); i++) {
                emptyList.add(Long.valueOf(commandMessage.getAckSetAt(i)));
            }
        }
        int redeliveryCount = commandMessage.getRedeliveryCount();
        MessageIdData messageId = commandMessage.getMessageId();
        long consumerEpoch = commandMessage.hasConsumerEpoch() ? commandMessage.getConsumerEpoch() : -1L;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}/{}", this.topic, this.subscription, Long.valueOf(messageId.getLedgerId()), Long.valueOf(messageId.getEntryId()));
        }
        if (!verifyChecksum(byteBuf, messageId)) {
            discardCorruptedMessage(messageId, clientCnx, CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        try {
            BrokerEntryMetadata parseBrokerEntryMetadataIfExist = Commands.parseBrokerEntryMetadataIfExist(byteBuf);
            MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(byteBuf);
            int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
            boolean z = (parseMessageMetadata.hasNumChunksFromMsg() ? parseMessageMetadata.getNumChunksFromMsg() : 0) > 1;
            MessageIdImpl messageIdImpl = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
            if (numMessagesInBatch == 1 && !parseMessageMetadata.hasNumMessagesInBatch() && this.acknowledgmentsGroupingTracker.isDuplicate(messageIdImpl)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", this.topic, this.subscription, this.consumerName, messageIdImpl);
                }
                increaseAvailablePermits(clientCnx, numMessagesInBatch);
                return;
            }
            DecryptResult decryptPayloadIfNeeded = decryptPayloadIfNeeded(messageId, redeliveryCount, parseMessageMetadata, byteBuf, clientCnx);
            if (decryptPayloadIfNeeded.shouldDiscard()) {
                return;
            }
            boolean z2 = !decryptPayloadIfNeeded.success;
            ByteBuf byteBuf2 = decryptPayloadIfNeeded.payload;
            ByteBuf retain = (z2 || z) ? byteBuf2.retain() : uncompressPayloadIfNeeded(messageId, parseMessageMetadata, byteBuf2, clientCnx, true);
            byteBuf2.release();
            if (retain == null) {
                return;
            }
            if (this.conf.getPayloadProcessor() != null) {
                processPayloadByProcessor(parseBrokerEntryMetadataIfExist, parseMessageMetadata, retain, messageIdImpl, this.schema, redeliveryCount, emptyList, consumerEpoch);
                return;
            }
            if (z2 || (numMessagesInBatch == 1 && !parseMessageMetadata.hasNumMessagesInBatch())) {
                if (z) {
                    retain = processMessageChunk(retain, parseMessageMetadata, messageIdImpl, messageId, clientCnx);
                    if (retain == null) {
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}", Integer.valueOf(parseMessageMetadata.getChunkId()), Integer.valueOf(parseMessageMetadata.getNumChunksFromMsg()), messageIdImpl, Long.valueOf(parseMessageMetadata.getSequenceId()));
                    }
                    ChunkedMessageCtx remove = this.chunkedMessagesMap.remove(parseMessageMetadata.getUuid());
                    if (remove.chunkedMessageIds.length > 0) {
                        messageIdImpl = new ChunkMessageIdImpl(remove.chunkedMessageIds[0], remove.chunkedMessageIds[remove.chunkedMessageIds.length - 1]);
                    }
                    this.unAckedChunkedMessageIdSequenceMap.put(messageIdImpl, remove.chunkedMessageIds);
                    this.pendingChunkedMessageCount--;
                    remove.recycle();
                }
                if (this.topicName.isPersistent() && isSameEntry(messageIdImpl) && isPriorEntryIndex(messageId.getEntryId())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", this.subscription, this.consumerName, this.startMessageId);
                    }
                    retain.release();
                    return;
                }
                MessageImpl newMessage = newMessage(messageIdImpl, parseBrokerEntryMetadataIfExist, parseMessageMetadata, retain, this.schema, redeliveryCount, consumerEpoch);
                retain.release();
                if (this.deadLetterPolicy != null && this.possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= this.deadLetterPolicy.getMaxRedeliverCount()) {
                    this.possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) newMessage.getMessageId(), Collections.singletonList(newMessage));
                    if (redeliveryCount > this.deadLetterPolicy.getMaxRedeliverCount()) {
                        redeliverUnacknowledgedMessages(Collections.singleton(newMessage.getMessageId()));
                        increaseAvailablePermits(clientCnx);
                        return;
                    }
                }
                executeNotifyCallback(newMessage);
            } else {
                receiveIndividualMessagesFromBatch(parseBrokerEntryMetadataIfExist, parseMessageMetadata, redeliveryCount, emptyList, retain, messageId, clientCnx, consumerEpoch);
                retain.release();
            }
            tryTriggerListener();
        } catch (Throwable th) {
            discardCorruptedMessage(messageId, clientCnx, CommandAck.ValidationError.ChecksumMismatch);
        }
    }

    private ByteBuf processMessageChunk(ByteBuf byteBuf, MessageMetadata messageMetadata, MessageIdImpl messageIdImpl, MessageIdData messageIdData, ClientCnx clientCnx) {
        if (messageMetadata.getChunkId() != messageMetadata.getNumChunksFromMsg() - 1) {
            increaseAvailablePermits(clientCnx);
        }
        if (this.expireTimeOfIncompleteChunkedMessageMillis > 0 && this.expireChunkMessageTaskScheduled.compareAndSet(false, true)) {
            ((ScheduledExecutorService) this.client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(() -> {
                this.internalPinnedExecutor.execute(Runnables.catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages));
            }, this.expireTimeOfIncompleteChunkedMessageMillis, this.expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS);
        }
        ChunkedMessageCtx chunkedMessageCtx = this.chunkedMessagesMap.get(messageMetadata.getUuid());
        if (messageMetadata.getChunkId() == 0) {
            if (chunkedMessageCtx != null) {
                if (Arrays.stream(chunkedMessageCtx.chunkedMessageIds).noneMatch(messageIdImpl2 -> {
                    return messageIdImpl2 != null && messageIdImpl2.ledgerId == messageIdData.getLedgerId() && messageIdImpl2.entryId == messageIdData.getEntryId();
                })) {
                    Arrays.stream(chunkedMessageCtx.chunkedMessageIds).forEach(messageIdImpl3 -> {
                        if (messageIdImpl3 != null) {
                            doAcknowledge(messageIdImpl3, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
                        }
                    });
                }
                if (chunkedMessageCtx.chunkedMsgBuffer != null) {
                    ReferenceCountUtil.safeRelease(chunkedMessageCtx.chunkedMsgBuffer);
                }
                chunkedMessageCtx.recycle();
                this.chunkedMessagesMap.remove(messageMetadata.getUuid());
            }
            this.pendingChunkedMessageCount++;
            if (this.maxPendingChunkedMessage > 0 && this.pendingChunkedMessageCount > this.maxPendingChunkedMessage) {
                removeOldestPendingChunkedMessage();
            }
            int numChunksFromMsg = messageMetadata.getNumChunksFromMsg();
            ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(messageMetadata.getTotalChunkMsgSize(), messageMetadata.getTotalChunkMsgSize());
            chunkedMessageCtx = this.chunkedMessagesMap.computeIfAbsent(messageMetadata.getUuid(), str -> {
                return ChunkedMessageCtx.get(numChunksFromMsg, buffer);
            });
            this.pendingChunkedMessageUuidQueue.add(messageMetadata.getUuid());
        }
        if (chunkedMessageCtx != null && chunkedMessageCtx.chunkedMsgBuffer != null && messageMetadata.getChunkId() == chunkedMessageCtx.lastChunkedMessageId + 1) {
            chunkedMessageCtx.chunkedMessageIds[messageMetadata.getChunkId()] = messageIdImpl;
            chunkedMessageCtx.chunkedMsgBuffer.writeBytes(byteBuf);
            chunkedMessageCtx.lastChunkedMessageId = messageMetadata.getChunkId();
            if (messageMetadata.getChunkId() != messageMetadata.getNumChunksFromMsg() - 1) {
                byteBuf.release();
                return null;
            }
            byteBuf.release();
            ByteBuf byteBuf2 = chunkedMessageCtx.chunkedMsgBuffer;
            ByteBuf uncompressPayloadIfNeeded = uncompressPayloadIfNeeded(messageIdData, messageMetadata, byteBuf2, clientCnx, false);
            byteBuf2.release();
            return uncompressPayloadIfNeeded;
        }
        if (chunkedMessageCtx != null && messageMetadata.getChunkId() <= chunkedMessageCtx.lastChunkedMessageId) {
            log.warn("[{}] Receive a duplicated chunk message with messageId [{}], last-chunk-Id [{}], chunkId [{}], sequenceId [{}]", messageMetadata.getProducerName(), messageIdImpl, Integer.valueOf(chunkedMessageCtx.lastChunkedMessageId), Integer.valueOf(messageMetadata.getChunkId()), Long.valueOf(messageMetadata.getSequenceId()));
            byteBuf.release();
            if (!Arrays.stream(chunkedMessageCtx.chunkedMessageIds).noneMatch(messageIdImpl4 -> {
                return messageIdImpl4 != null && messageIdImpl4.ledgerId == messageIdData.getLedgerId() && messageIdImpl4.entryId == messageIdData.getEntryId();
            })) {
                return null;
            }
            doAcknowledge(messageIdImpl, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
            return null;
        }
        Logger logger = log;
        Object[] objArr = new Object[6];
        objArr[0] = this.topic;
        objArr[1] = this.subscription;
        objArr[2] = messageIdImpl;
        objArr[3] = chunkedMessageCtx != null ? Integer.valueOf(chunkedMessageCtx.lastChunkedMessageId) : null;
        objArr[4] = Integer.valueOf(messageMetadata.getChunkId());
        objArr[5] = messageMetadata.getUuid();
        logger.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}, uuid = {}", objArr);
        if (chunkedMessageCtx != null) {
            if (chunkedMessageCtx.chunkedMsgBuffer != null) {
                ReferenceCountUtil.safeRelease(chunkedMessageCtx.chunkedMsgBuffer);
            }
            chunkedMessageCtx.recycle();
        }
        this.chunkedMessagesMap.remove(messageMetadata.getUuid());
        byteBuf.release();
        if (this.expireTimeOfIncompleteChunkedMessageMillis <= 0 || System.currentTimeMillis() <= messageMetadata.getPublishTime() + this.expireTimeOfIncompleteChunkedMessageMillis) {
            trackMessage(messageIdImpl);
            return null;
        }
        doAcknowledge(messageIdImpl, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyPendingReceivedCallback(Message<T> message, Exception exc) {
        CompletableFuture<Message<T>> nextPendingReceive;
        if (this.pendingReceives.isEmpty() || (nextPendingReceive = nextPendingReceive()) == null) {
            return;
        }
        if (exc != null) {
            this.internalPinnedExecutor.execute(() -> {
                nextPendingReceive.completeExceptionally(exc);
            });
            return;
        }
        if (message == null) {
            IllegalStateException illegalStateException = new IllegalStateException("received message can't be null");
            this.internalPinnedExecutor.execute(() -> {
                nextPendingReceive.completeExceptionally(illegalStateException);
            });
        } else if (getCurrentReceiverQueueSize() == 0) {
            trackMessage((Message<?>) message);
            interceptAndComplete(message, nextPendingReceive);
        } else {
            increaseIncomingMessageSize(message);
            messageProcessed(message);
            interceptAndComplete(message, nextPendingReceive);
        }
    }

    private void interceptAndComplete(Message<T> message, CompletableFuture<Message<T>> completableFuture) {
        completePendingReceive(completableFuture, beforeConsume(message));
    }

    void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata messageMetadata, int i, List<Long> list, ByteBuf byteBuf, MessageIdData messageIdData, ClientCnx clientCnx, long j) {
        int numMessagesInBatch = messageMetadata.getNumMessagesInBatch();
        MessageIdImpl messageIdImpl = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex());
        ArrayList arrayList = null;
        if (this.deadLetterPolicy != null && i >= this.deadLetterPolicy.getMaxRedeliverCount()) {
            arrayList = new ArrayList();
        }
        BitSet newAckSet = BatchMessageIdImpl.newAckSet(numMessagesInBatch);
        BitSetRecyclable bitSetRecyclable = null;
        if (list != null && list.size() > 0) {
            bitSetRecyclable = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(list));
        }
        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
        int i2 = 0;
        for (int i3 = 0; i3 < numMessagesInBatch; i3++) {
            try {
                MessageImpl<T> newSingleMessage = newSingleMessage(i3, numMessagesInBatch, brokerEntryMetadata, messageMetadata, singleMessageMetadata, byteBuf, messageIdImpl, this.schema, true, bitSetRecyclable, newAckSet, i, j);
                if (newSingleMessage != null) {
                    if (arrayList != null) {
                        arrayList.add(newSingleMessage);
                        if (i > this.deadLetterPolicy.getMaxRedeliverCount()) {
                            i2++;
                        }
                    }
                    if (this.acknowledgmentsGroupingTracker.isDuplicate(newSingleMessage.getMessageId())) {
                        i2++;
                    } else {
                        executeNotifyCallback(newSingleMessage);
                    }
                } else if (!isSingleMessageAcked(bitSetRecyclable, i3)) {
                    i2++;
                }
            } catch (IllegalStateException e) {
                log.warn("[{}] [{}] unable to obtain message in batch", this.subscription, this.consumerName, e);
                discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.BatchDeSerializeError);
            }
        }
        if (bitSetRecyclable != null) {
            bitSetRecyclable.recycle();
        }
        if (this.deadLetterPolicy != null && this.possibleSendToDeadLetterTopicMessages != null && i >= this.deadLetterPolicy.getMaxRedeliverCount()) {
            this.possibleSendToDeadLetterTopicMessages.put(messageIdImpl, arrayList);
            if (i > this.deadLetterPolicy.getMaxRedeliverCount()) {
                redeliverUnacknowledgedMessages(Collections.singleton(messageIdImpl));
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", this.subscription, this.consumerName, Integer.valueOf(this.incomingMessages.size()), Integer.valueOf(this.incomingMessages.remainingCapacity()));
        }
        if (i2 > 0) {
            increaseAvailablePermits(clientCnx, i2);
        }
    }

    private boolean isPriorEntryIndex(long j) {
        return this.resetIncludeHead ? j < this.startMessageId.getEntryId() : j <= this.startMessageId.getEntryId();
    }

    private boolean isPriorBatchIndex(long j) {
        return this.resetIncludeHead ? j < ((long) this.startMessageId.getBatchIndex()) : j <= ((long) this.startMessageId.getBatchIndex());
    }

    private boolean isSameEntry(MessageIdImpl messageIdImpl) {
        return this.startMessageId != null && messageIdImpl.getLedgerId() == this.startMessageId.getLedgerId() && messageIdImpl.getEntryId() == this.startMessageId.getEntryId();
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected synchronized void messageProcessed(Message<?> message) {
        ClientCnx cnx = cnx();
        ClientCnx cnx2 = ((MessageImpl) message).getCnx();
        this.lastDequeuedMessageId = message.getMessageId();
        this.messagesPrefetchedGauge.decrement();
        this.messagesReceivedCounter.increment();
        this.bytesPrefetchedGauge.subtract(message.size());
        this.bytesReceivedCounter.add(message.size());
        if (cnx2 == cnx) {
            if (this.listener == null && !this.parentConsumerHasListener) {
                increaseAvailablePermits(cnx);
            }
            this.stats.updateNumMsgsReceived(message);
            trackMessage(message);
        }
        decreaseIncomingMessageSize(message);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void trackMessage(Message<?> message) {
        if (message != null) {
            trackMessage(message.getMessageId(), message.getRedeliveryCount());
        }
    }

    protected void trackMessage(MessageId messageId) {
        trackMessage(messageId, 0);
    }

    protected void trackMessage(MessageId messageId, int i) {
        if (this.conf.getAckTimeoutMillis() <= 0 || !(messageId instanceof MessageIdImpl)) {
            return;
        }
        MessageIdAdv discardBatch = MessageIdAdvUtils.discardBatch(messageId);
        if (this.hasParentConsumer) {
            this.unAckedMessageTracker.remove(discardBatch);
        } else {
            trackUnAckedMsgIfNoListener(discardBatch, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void increaseAvailablePermits(MessageImpl<?> messageImpl) {
        ClientCnx cnx = cnx();
        if (messageImpl.getCnx() == cnx) {
            increaseAvailablePermits(cnx);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void increaseAvailablePermits(ClientCnx clientCnx) {
        increaseAvailablePermits(clientCnx, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void increaseAvailablePermits(ClientCnx clientCnx, int i) {
        int addAndGet = AVAILABLE_PERMITS_UPDATER.addAndGet(this, i);
        while (true) {
            int i2 = addAndGet;
            if (i2 < getCurrentReceiverQueueSize() / 2 || this.paused) {
                return;
            }
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, i2, 0)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sending permit-cmd to broker with available permits = {}", this.topic, Integer.valueOf(i2));
                }
                sendFlowPermitsToBroker(clientCnx, i2);
                return;
            }
            addAndGet = AVAILABLE_PERMITS_UPDATER.get(this);
        }
    }

    public void increaseAvailablePermits(int i) {
        increaseAvailablePermits(cnx(), i);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected void setCurrentReceiverQueueSize(int i) {
        Preconditions.checkArgument(i > 0, "receiver queue size should larger than 0");
        int andSet = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, i);
        int i2 = i - andSet;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}", this.topic, this.subscription, Integer.valueOf(andSet), Integer.valueOf(i), Integer.valueOf(i2));
        }
        increaseAvailablePermits(i2);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void pause() {
        this.paused = true;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void resume() {
        if (this.paused) {
            this.paused = false;
            increaseAvailablePermits(cnx(), 0);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public long getLastDisconnectedTimestamp() {
        return this.connectionHandler.lastConnectionClosedTimestamp;
    }

    private DecryptResult decryptPayloadIfNeeded(MessageIdData messageIdData, int i, MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx) {
        if (messageMetadata.getEncryptionKeysCount() == 0) {
            return DecryptResult.success(byteBuf.retain());
        }
        int numMessagesInBatch = messageMetadata.getNumMessagesInBatch();
        if (this.conf.getCryptoKeyReader() == null) {
            return handleCryptoFailure(byteBuf, messageIdData, clientCnx, i, numMessagesInBatch, true);
        }
        int maxOutputSize = this.msgCrypto.getMaxOutputSize(byteBuf.readableBytes());
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(maxOutputSize);
        ByteBuffer nioBuffer = buffer.nioBuffer(0, maxOutputSize);
        if (this.msgCrypto.decrypt(() -> {
            return messageMetadata;
        }, byteBuf.nioBuffer(), nioBuffer, this.conf.getCryptoKeyReader())) {
            buffer.writerIndex(nioBuffer.limit());
            return DecryptResult.success(buffer);
        }
        buffer.release();
        return handleCryptoFailure(byteBuf, messageIdData, clientCnx, i, numMessagesInBatch, false);
    }

    private DecryptResult handleCryptoFailure(ByteBuf byteBuf, MessageIdData messageIdData, ClientCnx clientCnx, int i, int i2, boolean z) {
        switch (this.conf.getCryptoFailureAction()) {
            case CONSUME:
                if (z) {
                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.", this.topic, this.subscription, this.consumerName);
                } else {
                    log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to consume.", this.topic, this.subscription, this.consumerName, messageIdData);
                }
                return DecryptResult.failure(byteBuf.retain());
            case DISCARD:
                if (z) {
                    log.warn("[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and config is set to discard message with batch size {}", this.topic, this.subscription, this.consumerName, Integer.valueOf(i2));
                } else {
                    log.warn("[{}][{}][{}][{}-{}-{}] Discarding message since decryption failed and config is set to discard", this.topic, this.subscription, this.consumerName, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Integer.valueOf(messageIdData.getBatchIndex()));
                }
                discardMessage(messageIdData, clientCnx, CommandAck.ValidationError.DecryptionError, i2);
                return DecryptResult.discard();
            case FAIL:
                if (z) {
                    log.error("[{}][{}][{}][{}-{}-{}] Message delivery failed since CryptoKeyReader interface is not implemented to consume encrypted message", this.topic, this.subscription, this.consumerName, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Integer.valueOf(this.partitionIndex));
                } else {
                    log.error("[{}][{}][{}][{}-{}-{}] Message delivery failed since unable to decrypt incoming message", this.topic, this.subscription, this.consumerName, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Integer.valueOf(this.partitionIndex));
                }
                this.unAckedMessageTracker.add(new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), this.partitionIndex), i);
                return DecryptResult.discard();
            default:
                log.warn("[{}][{}][{}] Invalid crypto failure state found, continue message consumption.", this.topic, this.subscription, this.consumerName);
                return DecryptResult.failure(byteBuf.retain());
        }
    }

    private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageIdData, MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx, boolean z) {
        CompressionType compression = messageMetadata.getCompression();
        CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(compression);
        int uncompressedSize = messageMetadata.getUncompressedSize();
        int readableBytes = byteBuf.readableBytes();
        if (z && readableBytes > getConnectionHandler().getMaxMessageSize()) {
            log.error("[{}][{}] Got corrupted payload message size {} at {}", this.topic, this.subscription, Integer.valueOf(readableBytes), messageIdData);
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.UncompressedSizeCorruption);
            return null;
        }
        try {
            return compressionCodec.decode(byteBuf, uncompressedSize);
        } catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", this.topic, this.subscription, compression, messageIdData, e.getMessage(), e);
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.DecompressionError);
            return null;
        }
    }

    private boolean verifyChecksum(ByteBuf byteBuf, MessageIdData messageIdData) {
        int readChecksum;
        int computeChecksum;
        if (!Commands.hasChecksum(byteBuf) || (readChecksum = Commands.readChecksum(byteBuf)) == (computeChecksum = Crc32cIntChecksum.computeChecksum(byteBuf))) {
            return true;
        }
        log.error("[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Long.toHexString(readChecksum), Integer.toHexString(computeChecksum));
        return false;
    }

    private void discardCorruptedMessage(MessageIdImpl messageIdImpl, ClientCnx clientCnx, CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", this.topic, this.subscription, Long.valueOf(messageIdImpl.getLedgerId()), Long.valueOf(messageIdImpl.getEntryId()));
        clientCnx.ctx().writeAndFlush(Commands.newAck(this.consumerId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), (BitSetRecyclable) null, CommandAck.AckType.Individual, validationError, (Map<String, Long>) Collections.emptyMap(), -1L), clientCnx.ctx().voidPromise());
        increaseAvailablePermits(clientCnx);
        this.stats.incrementNumReceiveFailed();
    }

    private void discardCorruptedMessage(MessageIdData messageIdData, ClientCnx clientCnx, CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()));
        discardMessage(messageIdData, clientCnx, validationError, 1);
    }

    private void discardMessage(MessageIdData messageIdData, ClientCnx clientCnx, CommandAck.ValidationError validationError, int i) {
        clientCnx.ctx().writeAndFlush(Commands.newAck(this.consumerId, messageIdData.getLedgerId(), messageIdData.getEntryId(), (BitSetRecyclable) null, CommandAck.AckType.Individual, validationError, (Map<String, Long>) Collections.emptyMap(), -1L), clientCnx.ctx().voidPromise());
        increaseAvailablePermits(clientCnx, i);
        this.stats.incrementNumReceiveFailed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.client.impl.HandlerState
    public String getHandlerName() {
        return this.subscription;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean isConnected() {
        return getClientCnx() != null && getState() == HandlerState.State.Ready;
    }

    public boolean isConnected(ClientCnx clientCnx) {
        return clientCnx != null && getState() == HandlerState.State.Ready;
    }

    int getPartitionIndex() {
        return this.partitionIndex;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int getAvailablePermits() {
        return AVAILABLE_PERMITS_UPDATER.get(this);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int numMessagesInQueue() {
        return this.incomingMessages.size();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void redeliverUnacknowledgedMessages() {
        synchronized (this) {
            ClientCnx cnx = cnx();
            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
                if (getState() == HandlerState.State.Connecting) {
                    log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
                } else {
                    log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                    cnx.ctx().close();
                }
                return;
            }
            this.incomingQueueLock.lock();
            try {
                if (this.conf.getSubscriptionType() == SubscriptionType.Failover || this.conf.getSubscriptionType() == SubscriptionType.Exclusive) {
                    CONSUMER_EPOCH.incrementAndGet(this);
                }
                int size = this.incomingMessages.size();
                clearIncomingMessages();
                this.unAckedMessageTracker.clear();
                this.incomingQueueLock.unlock();
                if (cnx == null || !isConnected(cnx)) {
                    log.warn("[{}] Send redeliver messages command but the client is reconnect or close, so don't need to send redeliver command to broker", this);
                } else {
                    cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
                    if (size > 0) {
                        increaseAvailablePermits(cnx, size);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", this.subscription, this.topic, this.consumerName, Integer.valueOf(size));
                    }
                }
            } catch (Throwable th) {
                this.incomingQueueLock.unlock();
                throw th;
            }
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public void redeliverUnacknowledgedMessages(Set<MessageId> set) {
        if (set.isEmpty()) {
            return;
        }
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared && this.conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
            redeliverUnacknowledgedMessages();
            return;
        }
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
            if (cnx == null || getState() == HandlerState.State.Connecting) {
                log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
                return;
            } else {
                log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                cnx.ctx().close();
                return;
            }
        }
        int removeExpiredMessagesFromQueue = removeExpiredMessagesFromQueue(set);
        Iterables.partition(set, 1000).forEach(list -> {
            getRedeliveryMessageIdData(list).thenAccept(list -> {
                if (list.isEmpty()) {
                    return;
                }
                cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId, (List<MessageIdData>) list), cnx.ctx().voidPromise());
            });
        });
        if (removeExpiredMessagesFromQueue > 0) {
            increaseAvailablePermits(cnx, removeExpiredMessagesFromQueue);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", this.subscription, this.topic, this.consumerName, Integer.valueOf(removeExpiredMessagesFromQueue));
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected void updateAutoScaleReceiverQueueHint() {
        boolean andSet = this.scaleReceiverQueueHint.getAndSet(getAvailablePermits() + this.incomingMessages.size() >= getCurrentReceiverQueueSize());
        if (!log.isDebugEnabled() || andSet == this.scaleReceiverQueueHint.get()) {
            return;
        }
        log.debug("updateAutoScaleReceiverQueueHint {} -> {}", Boolean.valueOf(andSet), Boolean.valueOf(this.scaleReceiverQueueHint.get()));
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected void completeOpBatchReceive(ConsumerBase.OpBatchReceive<T> opBatchReceive) {
        notifyPendingBatchReceivedCallBack(opBatchReceive.future);
    }

    private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageId> list) {
        if (list == null || list.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        List list2 = (List) list.stream().map(messageId -> {
            MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
            return processPossibleToDLQ(messageIdAdv).thenApply(bool -> {
                if (bool.booleanValue()) {
                    return null;
                }
                return new MessageIdData().setPartition(messageIdAdv.getPartitionIndex()).setLedgerId(messageIdAdv.getLedgerId()).setEntryId(messageIdAdv.getEntryId());
            });
        }).collect(Collectors.toList());
        return FutureUtil.waitForAll(list2).thenApply(r4 -> {
            return (List) list2.stream().map((v0) -> {
                return v0.join();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageIdAdv) {
        List<MessageImpl<T>> list = null;
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            list = this.possibleSendToDeadLetterTopicMessages.get(MessageIdAdvUtils.discardBatch(messageIdAdv));
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        if (list != null) {
            List<MessageImpl<T>> list2 = list;
            initDeadLetterProducerIfNeeded().thenAcceptAsync(producer -> {
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    MessageImpl messageImpl = (MessageImpl) it.next();
                    try {
                        TypedMessageBuilder<T> properties = producer.newMessage(Schema.AUTO_PRODUCE_BYTES(messageImpl.getReaderSchema().get())).value(messageImpl.getData()).properties(getPropertiesMap(messageImpl, messageImpl.getMessageId().toString(), getOriginTopicNameStr(messageImpl)));
                        copyMessageKeysIfNeeded(messageImpl, properties);
                        copyMessageEventTime(messageImpl, properties);
                        properties.sendAsync().thenAccept(messageId -> {
                            this.possibleSendToDeadLetterTopicMessages.remove(messageIdAdv);
                            acknowledgeAsync(messageIdAdv).whenComplete((r10, th) -> {
                                if (th == null) {
                                    completableFuture.complete(true);
                                } else {
                                    log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.", this.topicName, this.subscription, this.consumerName, messageIdAdv, th);
                                    completableFuture.complete(false);
                                }
                            });
                        }).exceptionally(th -> {
                            if (th instanceof PulsarClientException.ProducerQueueIsFullError) {
                                log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), messageIdAdv, th.getMessage());
                            } else {
                                log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), messageIdAdv, th);
                            }
                            completableFuture.complete(false);
                            return null;
                        });
                    } catch (Exception e) {
                        log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), messageIdAdv, e);
                        completableFuture.complete(false);
                    }
                }
            }, (Executor) this.internalPinnedExecutor).exceptionally(th -> {
                log.error("Dead letter producer exception with topic: {}", this.deadLetterPolicy.getDeadLetterTopic(), th);
                completableFuture.complete(false);
                return null;
            });
        } else {
            completableFuture.complete(false);
        }
        return completableFuture;
    }

    private void customizeDeadLetterProducerBuilder(DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer, String str, ProducerBuilder<byte[]> producerBuilder) {
        if (deadLetterProducerBuilderCustomizer != null) {
            deadLetterProducerBuilderCustomizer.customize(new DeadLetterProducerBuilderContextImpl(str, this.topic, this.subscription, this.consumerName), producerBuilder);
        }
    }

    private CompletableFuture<Producer<byte[]>> initDeadLetterProducerIfNeeded() {
        CompletableFuture<Producer<byte[]>> completableFuture = this.deadLetterProducer;
        if (completableFuture == null || completableFuture.isCompletedExceptionally()) {
            this.createProducerLock.writeLock().lock();
            try {
                completableFuture = this.deadLetterProducer;
                if (completableFuture == null || completableFuture.isCompletedExceptionally()) {
                    completableFuture = createProducerWithBackOff(() -> {
                        ProducerBuilder<byte[]> enableChunking = ((ProducerBuilderImpl) this.client.newProducer(Schema.AUTO_PRODUCE_BYTES(this.schema))).initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()).topic(this.deadLetterPolicy.getDeadLetterTopic()).producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, this.consumerName, RandomStringUtils.randomAlphanumeric(5))).blockIfQueueFull(false).enableBatching(false).enableChunking(true);
                        customizeDeadLetterProducerBuilder(this.deadLetterPolicy.getDeadLetterProducerBuilderCustomizer(), this.deadLetterPolicy.getDeadLetterTopic(), enableChunking);
                        CompletableFuture<Producer<byte[]>> createAsync = enableChunking.createAsync();
                        createAsync.whenComplete((producer, th) -> {
                            if (th != null) {
                                log.error("[{}] [{}] [{}] Failed to create dead letter producer for topic {}", this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getDeadLetterTopic(), th);
                                this.deadLetterProducerFailureCount++;
                            } else {
                                this.deadLetterProducerFailureCount = 0;
                                this.stats.setDeadLetterProducerStats(producer.getStats());
                            }
                        });
                        return createAsync;
                    }, this.deadLetterProducerFailureCount, () -> {
                        return "dead letter producer (topic: " + this.deadLetterPolicy.getDeadLetterTopic() + DefaultExpressionEngine.DEFAULT_INDEX_END;
                    });
                    this.deadLetterProducer = completableFuture;
                }
            } finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
        return completableFuture;
    }

    private CompletableFuture<Producer<byte[]>> createProducerWithBackOff(Supplier<CompletableFuture<Producer<byte[]>>> supplier, int i, Supplier<String> supplier2) {
        if (i == 0) {
            return supplier.get();
        }
        Backoff create = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMandatoryStop(this.client.getConfiguration().getOperationTimeoutMs() * 2, TimeUnit.MILLISECONDS).setMax(1L, TimeUnit.MINUTES).create();
        long j = 0;
        for (int i2 = 0; i2 < i; i2++) {
            j = create.next();
        }
        CompletableFuture<Producer<byte[]>> completableFuture = new CompletableFuture<>();
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) this.client.getScheduledExecutorProvider().getExecutor(this);
        log.info("Creating {} with backoff time of {} ms", supplier2.get(), Long.valueOf(j));
        scheduledExecutorService.schedule(() -> {
            FutureUtil.completeAfter(completableFuture, (CompletableFuture) supplier.get());
        }, j, TimeUnit.MILLISECONDS);
        return completableFuture;
    }

    private CompletableFuture<Producer<byte[]>> initRetryLetterProducerIfNeeded() {
        CompletableFuture<Producer<byte[]>> completableFuture = this.retryLetterProducer;
        if (completableFuture == null || completableFuture.isCompletedExceptionally()) {
            this.createProducerLock.writeLock().lock();
            try {
                completableFuture = this.retryLetterProducer;
                if (completableFuture == null || completableFuture.isCompletedExceptionally()) {
                    completableFuture = createProducerWithBackOff(() -> {
                        ProducerBuilder<byte[]> blockIfQueueFull = this.client.newProducer(Schema.AUTO_PRODUCE_BYTES(this.schema)).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).enableChunking(true).blockIfQueueFull(false);
                        customizeDeadLetterProducerBuilder(this.deadLetterPolicy.getRetryLetterProducerBuilderCustomizer(), this.deadLetterPolicy.getRetryLetterTopic(), blockIfQueueFull);
                        CompletableFuture<Producer<byte[]>> createAsync = blockIfQueueFull.createAsync();
                        createAsync.whenComplete((producer, th) -> {
                            if (th != null) {
                                log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}", this.topicName, this.subscription, this.consumerName, this.deadLetterPolicy.getRetryLetterTopic(), th);
                                this.retryLetterProducerFailureCount++;
                            } else {
                                this.retryLetterProducerFailureCount = 0;
                                this.stats.setRetryLetterProducerStats(producer.getStats());
                            }
                        });
                        return createAsync;
                    }, this.retryLetterProducerFailureCount, () -> {
                        return "retry letter producer (topic: " + this.deadLetterPolicy.getRetryLetterTopic() + DefaultExpressionEngine.DEFAULT_INDEX_END;
                    });
                    this.retryLetterProducer = completableFuture;
                }
            } finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            seekAsync(messageId).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(long j) throws PulsarClientException {
        try {
            seekAsync(j).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(Function<String, Object> function) throws PulsarClientException {
        try {
            seekAsync(function).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
        if (function == null) {
            return FutureUtil.failedFuture(new PulsarClientException("Function must be set"));
        }
        Object apply = function.apply(this.topic);
        return apply == null ? CompletableFuture.completedFuture(null) : apply instanceof MessageId ? seekAsync((MessageId) apply) : apply.getClass().getTypeName().equals(Long.class.getTypeName()) ? seekAsync(((Long) apply).longValue()) : FutureUtil.failedFuture(new PulsarClientException("Only support seek by messageId or timestamp"));
    }

    private CompletableFuture<Void> seekAsyncInternal(long j, ByteBuf byteBuf, MessageId messageId, Long l, String str) {
        AtomicLong atomicLong = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff create = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(atomicLong.get() * 2, TimeUnit.MILLISECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
        if (this.seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) {
            this.seekFuture = new CompletableFuture<>();
            seekAsyncInternal(j, byteBuf, messageId, l, str, create, atomicLong);
            return this.seekFuture;
        }
        String format = String.format("[%s][%s] attempting to seek operation that is already in progress (seek by %s)", this.topic, this.subscription, str);
        log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", this.topic, this.subscription, str);
        return FutureUtil.failedFuture(new IllegalStateException(format));
    }

    private void seekAsyncInternal(long j, ByteBuf byteBuf, MessageId messageId, Long l, String str, Backoff backoff, AtomicLong atomicLong) {
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx == null) {
            long min = Math.min(backoff.next(), atomicLong.get());
            if (min <= 0) {
                failSeek(new PulsarClientException.TimeoutException(String.format("The subscription %s of the topic %s could not seek withing configured timeout", this.subscription, this.topicName.toString())));
                return;
            } else {
                ((ScheduledExecutorService) this.client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                    log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", this.topic, getHandlerName(), Long.valueOf(min));
                    atomicLong.addAndGet(-min);
                    seekAsyncInternal(j, byteBuf, messageId, l, str, backoff, atomicLong);
                }, min, TimeUnit.MILLISECONDS);
                return;
            }
        }
        MessageIdAdv messageIdAdv = this.seekMessageId;
        this.seekMessageId = (MessageIdAdv) messageId;
        log.info("[{}][{}] Seeking subscription to {}", this.topic, this.subscription, str);
        boolean z = this.hasSoughtByTimestamp;
        this.hasSoughtByTimestamp = l != null;
        cnx.sendRequestWithId(byteBuf, j).thenRun(() -> {
            log.info("[{}][{}] Successfully reset subscription to {}", this.topic, this.subscription, str);
            this.acknowledgmentsGroupingTracker.flushAndClean();
            this.lastDequeuedMessageId = MessageId.earliest;
            clearIncomingMessages();
            CompletableFuture<Void> completableFuture = null;
            synchronized (this) {
                if (this.hasParentConsumer || cnx() != null) {
                    completableFuture = this.seekFuture;
                    this.startMessageId = this.seekMessageId;
                    this.seekStatus.set(SeekStatus.NOT_STARTED);
                } else {
                    this.seekStatus.set(SeekStatus.COMPLETED);
                }
            }
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        }).exceptionally(th -> {
            this.seekMessageId = messageIdAdv;
            this.hasSoughtByTimestamp = z;
            log.error("[{}][{}] Failed to reset subscription: {}", this.topic, this.subscription, th.getCause().getMessage());
            failSeek(PulsarClientException.wrap(th.getCause(), String.format("Failed to seek the subscription %s of the topic %s to %s", this.subscription, this.topicName.toString(), str)));
            return null;
        });
    }

    private void failSeek(Throwable th) {
        CompletableFuture<Void> completableFuture = this.seekFuture;
        if (this.seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) {
            completableFuture.completeExceptionally(th);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(long j) {
        String format = String.format("the timestamp %d", Long.valueOf(j));
        long newRequestId = this.client.newRequestId();
        return seekAsyncInternal(newRequestId, Commands.newSeek(this.consumerId, newRequestId, j), MessageId.earliest, Long.valueOf(j), format);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        long[] jArr;
        ByteBuf newSeek;
        String format = String.format("the message %s", messageId.toString());
        long newRequestId = this.client.newRequestId();
        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
        MessageIdAdv firstChunkMessageId = messageIdAdv.getFirstChunkMessageId();
        if (messageIdAdv.getFirstChunkMessageId() != null) {
            newSeek = Commands.newSeek(this.consumerId, newRequestId, firstChunkMessageId.getLedgerId(), firstChunkMessageId.getEntryId(), new long[0]);
        } else {
            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
                BitSetRecyclable create = BitSetRecyclable.create();
                create.set(0, messageIdAdv.getBatchSize());
                create.clear(0, Math.max(messageIdAdv.getBatchIndex(), 0));
                jArr = create.toLongArray();
                create.recycle();
            } else {
                jArr = new long[0];
            }
            newSeek = Commands.newSeek(this.consumerId, newRequestId, messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), jArr);
        }
        return seekAsyncInternal(newRequestId, newSeek, messageId, null, format);
    }

    public boolean hasMessageAvailable() throws PulsarClientException {
        try {
            return hasMessageAvailableAsync().get().booleanValue();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        if (this.incomingMessages != null && !this.incomingMessages.isEmpty()) {
            return CompletableFuture.completedFuture(true);
        }
        if (this.lastDequeuedMessageId == MessageId.earliest) {
            boolean z = this.hasSoughtByTimestamp;
            if (MessageId.latest.equals(this.startMessageId) || z) {
                CompletableFuture internalGetLastMessageIdAsync = internalGetLastMessageIdAsync();
                if (this.resetIncludeHead && !z) {
                    internalGetLastMessageIdAsync = internalGetLastMessageIdAsync.thenCompose(getLastMessageIdResponse -> {
                        return seekAsync(getLastMessageIdResponse.lastMessageId).thenApply(r3 -> {
                            return getLastMessageIdResponse;
                        });
                    });
                }
                internalGetLastMessageIdAsync.thenAccept((java.util.function.Consumer) getLastMessageIdResponse2 -> {
                    MessageIdAdv messageIdAdv = (MessageIdAdv) getLastMessageIdResponse2.lastMessageId;
                    MessageIdAdv messageIdAdv2 = (MessageIdAdv) getLastMessageIdResponse2.markDeletePosition;
                    if (messageIdAdv2 == null || (messageIdAdv2.getEntryId() < 0 && messageIdAdv2.getLedgerId() > messageIdAdv.getLedgerId())) {
                        if (messageIdAdv == null || messageIdAdv.getEntryId() < 0) {
                            completehasMessageAvailableWithValue(completableFuture, false);
                            return;
                        } else {
                            completehasMessageAvailableWithValue(completableFuture, this.resetIncludeHead);
                            return;
                        }
                    }
                    int result = ComparisonChain.start().compare(messageIdAdv2.getLedgerId(), messageIdAdv.getLedgerId()).compare(messageIdAdv2.getEntryId(), messageIdAdv.getEntryId()).result();
                    if (messageIdAdv.getEntryId() < 0) {
                        completehasMessageAvailableWithValue(completableFuture, false);
                    } else if (z) {
                        completehasMessageAvailableWithValue(completableFuture, result < 0);
                    } else {
                        completehasMessageAvailableWithValue(completableFuture, this.resetIncludeHead ? result <= 0 : result < 0);
                    }
                }).exceptionally(th -> {
                    log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription, th);
                    completableFuture.completeExceptionally(th.getCause());
                    return null;
                });
                return completableFuture;
            }
            if (hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead)) {
                completehasMessageAvailableWithValue(completableFuture, true);
                return completableFuture;
            }
            getLastMessageIdAsync().thenAccept(messageId -> {
                this.lastMessageIdInBroker = messageId;
                completehasMessageAvailableWithValue(completableFuture, hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead));
            }).exceptionally(th2 -> {
                log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
                completableFuture.completeExceptionally(th2.getCause());
                return null;
            });
        } else {
            if (hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false)) {
                completehasMessageAvailableWithValue(completableFuture, true);
                return completableFuture;
            }
            getLastMessageIdAsync().thenAccept(messageId2 -> {
                this.lastMessageIdInBroker = messageId2;
                completehasMessageAvailableWithValue(completableFuture, hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false));
            }).exceptionally(th3 -> {
                log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
                completableFuture.completeExceptionally(th3.getCause());
                return null;
            });
        }
        return completableFuture;
    }

    private void completehasMessageAvailableWithValue(CompletableFuture<Boolean> completableFuture, boolean z) {
        this.internalPinnedExecutor.execute(() -> {
            completableFuture.complete(Boolean.valueOf(z));
        });
    }

    private boolean hasMoreMessages(MessageId messageId, MessageId messageId2, boolean z) {
        if (!z || messageId.compareTo(messageId2) < 0 || ((MessageIdImpl) messageId).getEntryId() == -1) {
            return (z || messageId.compareTo(messageId2) <= 0 || ((MessageIdImpl) messageId).getEntryId() == -1) ? false : true;
        }
        return true;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    @Deprecated
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        return internalGetLastMessageIdAsync().thenApply(getLastMessageIdResponse -> {
            return getLastMessageIdResponse.lastMessageId;
        });
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
        return getLastMessageIdAsync().thenApply(messageId -> {
            return Collections.singletonList(new TopicMessageIdImpl(this.topic, (MessageIdAdv) messageId));
        });
    }

    public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException(String.format("The consumer %s was already closed when the subscription %s of the topic %s getting the last message id", this.consumerName, this.subscription, this.topicName.toString())));
        }
        AtomicLong atomicLong = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff create = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(atomicLong.get() * 2, TimeUnit.MILLISECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
        CompletableFuture<GetLastMessageIdResponse> completableFuture = new CompletableFuture<>();
        internalGetLastMessageIdAsync(create, atomicLong, completableFuture);
        return completableFuture;
    }

    private void internalGetLastMessageIdAsync(Backoff backoff, AtomicLong atomicLong, CompletableFuture<GetLastMessageIdResponse> completableFuture) {
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx == null) {
            long min = Math.min(backoff.next(), atomicLong.get());
            if (min <= 0) {
                completableFuture.completeExceptionally(new PulsarClientException.TimeoutException(String.format("The subscription %s of the topic %s could not get the last message id withing configured timeout", this.subscription, this.topicName.toString())));
                return;
            } else {
                log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", this.topic, getHandlerName(), Long.valueOf(min));
                ((ScheduledExecutorService) this.client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                    atomicLong.addAndGet(-min);
                    internalGetLastMessageIdAsync(backoff, atomicLong, completableFuture);
                }, min, TimeUnit.MILLISECONDS);
                return;
            }
        }
        if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
            completableFuture.completeExceptionally(new PulsarClientException.NotSupportedException(String.format("The command `GetLastMessageId` is not supported for the protocol version %d. The consumer is %s, topic %s, subscription %s", Integer.valueOf(cnx.getRemoteEndpointProtocolVersion()), this.consumerName, this.topicName.toString(), this.subscription)));
            return;
        }
        long newRequestId = this.client.newRequestId();
        ByteBuf newGetLastMessageId = Commands.newGetLastMessageId(this.consumerId, newRequestId);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Get topic last message Id", this.topic, this.subscription);
        }
        cnx.sendGetLastMessageId(newGetLastMessageId, newRequestId).thenAccept(commandGetLastMessageIdResponse -> {
            MessageIdData lastMessageId = commandGetLastMessageIdResponse.getLastMessageId();
            MessageIdImpl messageIdImpl = null;
            if (commandGetLastMessageIdResponse.hasConsumerMarkDeletePosition()) {
                messageIdImpl = new MessageIdImpl(commandGetLastMessageIdResponse.getConsumerMarkDeletePosition().getLedgerId(), commandGetLastMessageIdResponse.getConsumerMarkDeletePosition().getEntryId(), -1);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Successfully getLastMessageId {}:{}", this.topic, this.subscription, Long.valueOf(lastMessageId.getLedgerId()), Long.valueOf(lastMessageId.getEntryId()));
            }
            completableFuture.complete(new GetLastMessageIdResponse(lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition()) : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition(), lastMessageId.getBatchIndex()), messageIdImpl));
        }).exceptionally(th -> {
            log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
            completableFuture.completeExceptionally(PulsarClientException.wrap(th.getCause(), String.format("The subscription %s of the topic %s gets the last message id was failed", this.subscription, this.topicName.toString())));
            return null;
        });
    }

    private boolean isMessageUndecryptable(MessageMetadata messageMetadata) {
        return messageMetadata.getEncryptionKeysCount() > 0 && this.conf.getCryptoKeyReader() == null && this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME;
    }

    private Optional<EncryptionContext> createEncryptionContext(MessageMetadata messageMetadata) {
        EncryptionContext encryptionContext = null;
        if (messageMetadata.getEncryptionKeysCount() > 0) {
            encryptionContext = new EncryptionContext();
            Map<String, EncryptionContext.EncryptionKey> map = (Map) messageMetadata.getEncryptionKeysList().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, encryptionKeys -> {
                return new EncryptionContext.EncryptionKey(encryptionKeys.getValue(), (Map) encryptionKeys.getMetadatasList().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                })));
            }));
            byte[] encryptionParam = messageMetadata.getEncryptionParam();
            Optional<Integer> ofNullable = Optional.ofNullable(messageMetadata.hasNumMessagesInBatch() ? Integer.valueOf(messageMetadata.getNumMessagesInBatch()) : null);
            encryptionContext.setKeys(map);
            encryptionContext.setParam(encryptionParam);
            if (messageMetadata.hasEncryptionAlgo()) {
                encryptionContext.setAlgorithm(messageMetadata.getEncryptionAlgo());
            }
            encryptionContext.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(messageMetadata.getCompression()));
            encryptionContext.setUncompressedMessageSize(messageMetadata.getUncompressedSize());
            encryptionContext.setBatchSize(ofNullable);
        }
        return Optional.ofNullable(encryptionContext);
    }

    private int removeExpiredMessagesFromQueue(Set<MessageId> set) {
        int i = 0;
        while (true) {
            Message<T> pollIf = this.incomingMessages.pollIf(message -> {
                return set.contains(NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId()));
            });
            if (pollIf == null) {
                return i;
            }
            decreaseIncomingMessageSize(pollIf);
            i++;
            pollIf.release();
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public ConsumerStatsRecorder getStats() {
        return this.stats;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTerminated() {
        log.info("[{}] [{}] [{}] Consumer has reached the end of topic", this.subscription, this.topic, this.consumerName);
        this.hasReachedEndOfTopic = true;
        if (this.listener != null) {
            this.listener.reachedEndOfTopic(this);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public int hashCode() {
        return Objects.hash(this.topic, this.subscription, this.consumerName);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return (obj instanceof ConsumerImpl) && this.consumerId == ((ConsumerImpl) obj).consumerId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectionClosed(ClientCnx clientCnx, Optional<Long> optional, Optional<URI> optional2) {
        this.connectionHandler.connectionClosed(clientCnx, optional, optional2);
    }

    public ClientCnx getClientCnx() {
        return this.connectionHandler.cnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        if (clientCnx != null) {
            this.connectionHandler.setClientCnx(clientCnx);
            clientCnx.registerConsumer(this.consumerId, this);
            if (this.conf.isAckReceiptEnabled() && !Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
                log.warn("Server don't support ack for receipt! ProtoVersion >=17 support! nowVersion : {}", Integer.valueOf(clientCnx.getRemoteEndpointProtocolVersion()));
            }
        }
        ClientCnx andSet = this.clientCnxUsedForConsumerRegistration.getAndSet(clientCnx);
        if (andSet == null || andSet == clientCnx) {
            return;
        }
        andSet.removeConsumer(this.consumerId);
    }

    void deregisterFromClientCnx() {
        setClientCnx(null);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    @Deprecated
    public String getTopicNameWithoutPartition() {
        return this.topicNameWithoutPartition;
    }

    private void removeOldestPendingChunkedMessage() {
        ChunkedMessageCtx chunkedMessageCtx = null;
        String str = null;
        while (chunkedMessageCtx == null && !this.pendingChunkedMessageUuidQueue.isEmpty()) {
            str = this.pendingChunkedMessageUuidQueue.poll();
            chunkedMessageCtx = StringUtils.isNotBlank(str) ? this.chunkedMessagesMap.get(str) : null;
        }
        removeChunkMessage(str, chunkedMessageCtx, this.autoAckOldestChunkedMessageOnQueueFull);
    }

    protected void removeExpireIncompleteChunkedMessages() {
        if (this.expireTimeOfIncompleteChunkedMessageMillis <= 0) {
            return;
        }
        while (true) {
            String peek = this.pendingChunkedMessageUuidQueue.peek();
            if (peek == null) {
                return;
            }
            ChunkedMessageCtx chunkedMessageCtx = StringUtils.isNotBlank(peek) ? this.chunkedMessagesMap.get(peek) : null;
            if (chunkedMessageCtx == null || System.currentTimeMillis() <= chunkedMessageCtx.receivedTime + this.expireTimeOfIncompleteChunkedMessageMillis) {
                return;
            }
            this.pendingChunkedMessageUuidQueue.remove(peek);
            removeChunkMessage(peek, chunkedMessageCtx, true);
        }
    }

    private void removeChunkMessage(String str, ChunkedMessageCtx chunkedMessageCtx, boolean z) {
        if (chunkedMessageCtx == null) {
            return;
        }
        this.chunkedMessagesMap.remove(str);
        if (chunkedMessageCtx.chunkedMessageIds != null) {
            for (MessageIdImpl messageIdImpl : chunkedMessageCtx.chunkedMessageIds) {
                if (messageIdImpl != null) {
                    if (z) {
                        log.info("Removing chunk message-id {}", messageIdImpl);
                        doAcknowledge(messageIdImpl, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
                    } else {
                        trackMessage(messageIdImpl);
                    }
                }
            }
        }
        if (chunkedMessageCtx.chunkedMsgBuffer != null) {
            chunkedMessageCtx.chunkedMsgBuffer.release();
        }
        chunkedMessageCtx.recycle();
        this.pendingChunkedMessageCount--;
    }

    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> map, TxnID txnID) {
        List singletonList;
        long newRequestId = this.client.newRequestId();
        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
        long ledgerId = messageIdAdv.getLedgerId();
        long entryId = messageIdAdv.getEntryId();
        if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
            BitSetRecyclable create = BitSetRecyclable.create();
            create.set(0, messageIdAdv.getBatchSize());
            if (ackType == CommandAck.AckType.Cumulative) {
                MessageIdAdvUtils.acknowledge(messageIdAdv, false);
                create.clear(0, messageIdAdv.getBatchIndex() + 1);
            } else {
                create.clear(messageIdAdv.getBatchIndex());
            }
            singletonList = Collections.singletonList(Commands.newAck(this.consumerId, ledgerId, entryId, create, ackType, validationError, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId, messageIdAdv.getBatchSize()));
            create.recycle();
        } else {
            MessageIdImpl[] remove = this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv);
            if (remove == null || ackType == CommandAck.AckType.Cumulative) {
                singletonList = Collections.singletonList(Commands.newAck(this.consumerId, ledgerId, entryId, null, ackType, validationError, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId));
            } else if (Commands.peerSupportsMultiMessageAcknowledgment(getClientCnx().getRemoteEndpointProtocolVersion())) {
                ArrayList arrayList = new ArrayList(remove.length);
                for (MessageIdImpl messageIdImpl : remove) {
                    if (messageIdImpl != null && remove.length > 1) {
                        arrayList.add(Triple.of(Long.valueOf(messageIdImpl.getLedgerId()), Long.valueOf(messageIdImpl.getEntryId()), null));
                    }
                }
                singletonList = Collections.singletonList(newMultiTransactionMessageAck(this.consumerId, txnID, arrayList, newRequestId));
            } else {
                singletonList = new ArrayList();
                for (MessageIdImpl messageIdImpl2 : remove) {
                    singletonList.add(Commands.newAck(this.consumerId, messageIdImpl2.ledgerId, messageIdImpl2.entryId, null, ackType, validationError, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId));
                }
            }
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            this.unAckedMessageTracker.removeMessagesTill(messageId);
        } else {
            this.unAckedMessageTracker.remove(messageId);
        }
        ClientCnx cnx = cnx();
        if (cnx == null) {
            return FutureUtil.failedFuture(new PulsarClientException.ConnectException("Failed to ack message [" + messageId + "] for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState()));
        }
        LinkedList linkedList = new LinkedList();
        singletonList.forEach(byteBuf -> {
            linkedList.add(cnx.newAckForReceipt(byteBuf, newRequestId));
        });
        return FutureUtil.waitForAll(linkedList);
    }

    private ByteBuf newMultiTransactionMessageAck(long j, TxnID txnID, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> list, long j2) {
        BaseCommand newMultiMessageAckCommon = newMultiMessageAckCommon(list);
        newMultiMessageAckCommon.getAck().setConsumerId(j).setAckType(CommandAck.AckType.Individual).setTxnidLeastBits(txnID.getLeastSigBits()).setTxnidMostBits(txnID.getMostSigBits()).setRequestId(j2);
        return Commands.serializeWithSize(newMultiMessageAckCommon);
    }

    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> list) {
        BaseCommand type = LOCAL_BASE_COMMAND.get().clear().setType(BaseCommand.Type.ACK);
        CommandAck ack = type.setAck();
        int size = list.size();
        for (int i = 0; i < size; i++) {
            long longValue = list.get(i).getLeft().longValue();
            long longValue2 = list.get(i).getMiddle().longValue();
            ConcurrentBitSetRecyclable right = list.get(i).getRight();
            MessageIdData entryId = ack.addMessageId().setLedgerId(longValue).setEntryId(longValue2);
            if (right != null) {
                for (long j : right.toLongArray()) {
                    entryId.addAckSet(j);
                }
                right.recycle();
            }
        }
        return type;
    }

    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map, TxnID txnID) {
        long newRequestId = this.client.newRequestId();
        LinkedList linkedList = new LinkedList();
        for (MessageId messageId : list) {
            MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
            MessageIdData messageIdData = new MessageIdData();
            messageIdData.setLedgerId(messageIdAdv.getLedgerId());
            messageIdData.setEntryId(messageIdAdv.getEntryId());
            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
                BitSetRecyclable create = BitSetRecyclable.create();
                create.set(0, messageIdAdv.getBatchSize());
                if (ackType == CommandAck.AckType.Cumulative) {
                    MessageIdAdvUtils.acknowledge(messageIdAdv, false);
                    create.clear(0, messageIdAdv.getBatchIndex() + 1);
                } else {
                    create.clear(messageIdAdv.getBatchIndex());
                }
                for (long j : create.toLongArray()) {
                    messageIdData.addAckSet(j);
                }
                create.recycle();
            }
            linkedList.add(messageIdData);
            if (ackType == CommandAck.AckType.Cumulative) {
                this.unAckedMessageTracker.removeMessagesTill(messageId);
            } else {
                this.unAckedMessageTracker.remove(messageId);
            }
        }
        return cnx().newAckForReceipt(Commands.newAck(this.consumerId, linkedList, ackType, (CommandAck.ValidationError) null, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId), newRequestId);
    }

    public Map<MessageIdAdv, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
        return this.possibleSendToDeadLetterTopicMessages;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAckReceiptEnabled() {
        ClientCnx clientCnx = getClientCnx();
        return this.conf.isAckReceiptEnabled() && clientCnx != null && Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.HandlerState
    public void setRedirectedClusterURI(String str, String str2) throws URISyntaxException {
        super.setRedirectedClusterURI(str, str2);
        this.acknowledgmentsGroupingTracker.flushAndClean();
    }

    @VisibleForTesting
    public Producer<byte[]> getRetryLetterProducer() {
        if (this.retryLetterProducer == null || !this.retryLetterProducer.isDone()) {
            return null;
        }
        return this.retryLetterProducer.join();
    }

    @VisibleForTesting
    public Producer<byte[]> getDeadLetterProducer() throws ExecutionException, InterruptedException {
        if (this.deadLetterProducer == null || !this.deadLetterProducer.isDone()) {
            return null;
        }
        return this.deadLetterProducer.get();
    }

    @Generated
    int getPriorityLevel() {
        return this.priorityLevel;
    }

    static {
        $assertionsDisabled = !ConsumerImpl.class.desiredAssertionStatus();
        AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
        SUBSCRIBE_DEADLINE_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerImpl.class, "subscribeDeadline");
        LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() { // from class: org.apache.pulsar.client.impl.ConsumerImpl.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.concurrent.FastThreadLocal
            public BaseCommand initialValue() throws Exception {
                return new BaseCommand();
            }
        };
        log = LoggerFactory.getLogger((Class<?>) ConsumerImpl.class);
    }
}
