package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.class */
public class PersistentAcknowledgmentsGroupingTracker implements AcknowledgmentsGroupingTracker {
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private final ScheduledFuture<?> scheduledTask;
    private final boolean batchIndexAckEnabled;
    private final boolean ackReceiptEnabled;
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");
    private volatile LastCumulativeAck lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
    private volatile boolean cumulativeAckFlushRequired = false;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks = new ConcurrentSkipListSet<>();
    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
    private volatile TimedCompletableFuture<Void> currentIndividualAckFuture = new TimedCompletableFuture<>();
    private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture = new TimedCompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker$LastCumulativeAck.class */
    public static class LastCumulativeAck {
        private MessageIdImpl messageId;
        private BitSetRecyclable bitSetRecyclable;
        private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
        private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() { // from class: org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.LastCumulativeAck.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.pulsar.shade.io.netty.util.Recycler
            public LastCumulativeAck newObject(Recycler.Handle<LastCumulativeAck> handle) {
                return new LastCumulativeAck(handle);
            }
        };

        static LastCumulativeAck create(MessageIdImpl messageIdImpl, BitSetRecyclable bitSetRecyclable) {
            LastCumulativeAck lastCumulativeAck = RECYCLER.get();
            lastCumulativeAck.messageId = messageIdImpl;
            lastCumulativeAck.bitSetRecyclable = bitSetRecyclable;
            return lastCumulativeAck;
        }

        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> handle) {
            this.recyclerHandle = handle;
        }

        void recycle() {
            if (this.bitSetRecyclable != null) {
                this.bitSetRecyclable.recycle();
            }
            this.messageId = null;
            this.recyclerHandle.recycle(this);
        }
    }

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumerImpl, ConsumerConfigurationData<?> consumerConfigurationData, EventLoopGroup eventLoopGroup) {
        this.consumer = consumerImpl;
        this.acknowledgementGroupTimeMicros = consumerConfigurationData.getAcknowledgementsGroupTimeMicros();
        this.batchIndexAckEnabled = consumerConfigurationData.isBatchIndexAckEnabled();
        this.ackReceiptEnabled = consumerConfigurationData.isAckReceiptEnabled();
        if (this.acknowledgementGroupTimeMicros > 0) {
            this.scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(this::flush, this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS);
        } else {
            this.scheduledTask = null;
        }
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public boolean isDuplicate(@NonNull MessageId messageId) {
        if (messageId == null) {
            throw new NullPointerException("messageId is marked non-null but is null");
        }
        MessageIdImpl messageIdImpl = this.lastCumulativeAck.messageId;
        if (messageIdImpl == null) {
            return false;
        }
        if (messageId.compareTo(messageIdImpl) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains(messageId);
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map) {
        if (CommandAck.AckType.Cumulative.equals(ackType)) {
            if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
                list.forEach(messageId -> {
                    addAcknowledgment((MessageIdImpl) messageId, ackType, map);
                });
                return CompletableFuture.completedFuture(null);
            }
            HashSet hashSet = new HashSet();
            list.forEach(messageId2 -> {
                hashSet.add(addAcknowledgment((MessageIdImpl) messageId2, ackType, map));
            });
            return FutureUtil.waitForAll(new ArrayList(hashSet));
        }
        if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
            addListAcknowledgment(list);
            if (this.acknowledgementGroupTimeMicros == 0 || this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            return CompletableFuture.completedFuture(null);
        }
        this.lock.readLock().lock();
        try {
            if (list.size() == 0) {
                CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
                this.lock.readLock().unlock();
                if (this.acknowledgementGroupTimeMicros == 0 || this.pendingIndividualAcks.size() >= 1000) {
                    flush();
                }
                return completedFuture;
            }
            addListAcknowledgment(list);
            TimedCompletableFuture<Void> timedCompletableFuture = this.currentIndividualAckFuture;
            this.lock.readLock().unlock();
            if (this.acknowledgementGroupTimeMicros == 0 || this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            return timedCompletableFuture;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            if (this.acknowledgementGroupTimeMicros == 0 || this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            throw th;
        }
    }

    private void addListAcknowledgment(List<MessageId> list) {
        for (MessageId messageId : list) {
            this.consumer.onAcknowledge(messageId, null);
            if (messageId instanceof BatchMessageIdImpl) {
                BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId;
                if (batchMessageIdImpl.ackIndividual()) {
                    doIndividualAckAsync(modifyBatchMessageIdAndStatesInConsumer(batchMessageIdImpl));
                } else {
                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
                }
            } else {
                modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
                doIndividualAckAsync((MessageIdImpl) messageId);
            }
        }
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl messageIdImpl, CommandAck.AckType ackType, Map<String, Long> map) {
        if (!(messageIdImpl instanceof BatchMessageIdImpl)) {
            if (ackType != CommandAck.AckType.Individual) {
                this.consumer.onAcknowledgeCumulative(messageIdImpl, null);
                return doCumulativeAck(messageIdImpl, map, null);
            }
            this.consumer.onAcknowledge(messageIdImpl, null);
            modifyMessageIdStatesInConsumer(messageIdImpl);
            return doIndividualAck(messageIdImpl, map);
        }
        BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageIdImpl;
        if (ackType == CommandAck.AckType.Individual) {
            this.consumer.onAcknowledge(messageIdImpl, null);
            return batchMessageIdImpl.ackIndividual() ? doIndividualAck(modifyBatchMessageIdAndStatesInConsumer(batchMessageIdImpl), map) : this.batchIndexAckEnabled ? doIndividualBatchAck(batchMessageIdImpl, map) : CompletableFuture.completedFuture(null);
        }
        this.consumer.onAcknowledgeCumulative(messageIdImpl, null);
        if (batchMessageIdImpl.ackCumulative()) {
            return doCumulativeAck(messageIdImpl, map, null);
        }
        if (this.batchIndexAckEnabled) {
            return doCumulativeBatchIndexAck(batchMessageIdImpl, map);
        }
        if (CommandAck.AckType.Cumulative == ackType && !batchMessageIdImpl.getAcker().isPrevBatchCumulativelyAcked()) {
            doCumulativeAck(batchMessageIdImpl.prevBatchMessageId(), map, null);
            batchMessageIdImpl.getAcker().setPrevBatchCumulativelyAcked(true);
        }
        return CompletableFuture.completedFuture(null);
    }

    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageIdImpl) {
        MessageIdImpl messageIdImpl = new MessageIdImpl(batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), batchMessageIdImpl.getPartitionIndex());
        this.consumer.m54getStats().incrementNumAcksSent(batchMessageIdImpl.getBatchSize());
        clearMessageIdFromUnAckTrackerAndDeadLetter(messageIdImpl);
        return messageIdImpl;
    }

    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageIdImpl) {
        this.consumer.m54getStats().incrementNumAcksSent(1L);
        clearMessageIdFromUnAckTrackerAndDeadLetter(messageIdImpl);
    }

    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageIdImpl) {
        this.consumer.getUnAckedMessageTracker().remove(messageIdImpl);
        if (this.consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
            this.consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageIdImpl);
        }
    }

    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageIdImpl, Map<String, Long> map) {
        if (this.acknowledgementGroupTimeMicros == 0 || !(map == null || map.isEmpty())) {
            return doImmediateAck(messageIdImpl, CommandAck.AckType.Individual, map, null);
        }
        if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
            doIndividualAckAsync(messageIdImpl);
            if (this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            return CompletableFuture.completedFuture(null);
        }
        this.lock.readLock().lock();
        try {
            doIndividualAckAsync(messageIdImpl);
            TimedCompletableFuture<Void> timedCompletableFuture = this.currentIndividualAckFuture;
            this.lock.readLock().unlock();
            if (this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            return timedCompletableFuture;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            if (this.pendingIndividualAcks.size() >= 1000) {
                flush();
            }
            throw th;
        }
    }

    private void doIndividualAckAsync(MessageIdImpl messageIdImpl) {
        this.pendingIndividualAcks.add(messageIdImpl);
        this.pendingIndividualBatchIndexAcks.remove(messageIdImpl);
    }

    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageIdImpl, Map<String, Long> map) {
        return (this.acknowledgementGroupTimeMicros == 0 || !(map == null || map.isEmpty())) ? doImmediateBatchIndexAck(batchMessageIdImpl, batchMessageIdImpl.getBatchIndex(), batchMessageIdImpl.getBatchSize(), CommandAck.AckType.Individual, map) : doIndividualBatchAck(batchMessageIdImpl);
    }

    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageIdImpl) {
        if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
            doIndividualBatchAckAsync(batchMessageIdImpl);
            return CompletableFuture.completedFuture(null);
        }
        this.lock.readLock().lock();
        try {
            doIndividualBatchAckAsync(batchMessageIdImpl);
            return this.currentIndividualAckFuture;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageIdImpl, Map<String, Long> map, BitSetRecyclable bitSetRecyclable) {
        this.consumer.m54getStats().incrementNumAcksSent(this.consumer.getUnAckedMessageTracker().removeMessagesTill(messageIdImpl));
        if (this.acknowledgementGroupTimeMicros == 0 || !(map == null || map.isEmpty())) {
            return doImmediateAck(messageIdImpl, CommandAck.AckType.Cumulative, map, bitSetRecyclable);
        }
        if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
            doCumulativeAckAsync(messageIdImpl, bitSetRecyclable);
            if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                flush();
            }
            return CompletableFuture.completedFuture(null);
        }
        this.lock.readLock().lock();
        try {
            doCumulativeAckAsync(messageIdImpl, bitSetRecyclable);
            TimedCompletableFuture<Void> timedCompletableFuture = this.currentCumulativeAckFuture;
            this.lock.readLock().unlock();
            if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                flush();
            }
            return timedCompletableFuture;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                flush();
            }
            throw th;
        }
    }

    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageIdImpl) {
        this.pendingIndividualBatchIndexAcks.computeIfAbsent(new MessageIdImpl(batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), batchMessageIdImpl.getPartitionIndex()), messageIdImpl -> {
            ConcurrentBitSetRecyclable create;
            if (batchMessageIdImpl.getAcker() == null || (batchMessageIdImpl.getAcker() instanceof BatchMessageAckerDisabled)) {
                create = ConcurrentBitSetRecyclable.create();
                create.set(0, batchMessageIdImpl.getBatchIndex());
            } else {
                create = ConcurrentBitSetRecyclable.create(batchMessageIdImpl.getAcker().getBitSet());
            }
            return create;
        }).clear(batchMessageIdImpl.getBatchIndex());
    }

    private void doCumulativeAckAsync(MessageIdImpl messageIdImpl, BitSetRecyclable bitSetRecyclable) {
        LastCumulativeAck lastCumulativeAck;
        LastCumulativeAck create = LastCumulativeAck.create(messageIdImpl, bitSetRecyclable);
        do {
            lastCumulativeAck = this.lastCumulativeAck;
            if (messageIdImpl.compareTo((MessageId) lastCumulativeAck.messageId) <= 0) {
                create.recycle();
                return;
            }
        } while (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, create));
        if (lastCumulativeAck.bitSetRecyclable != null) {
            try {
                lastCumulativeAck.bitSetRecyclable.recycle();
            } catch (Exception e) {
            }
            lastCumulativeAck.bitSetRecyclable = null;
        }
        lastCumulativeAck.recycle();
        this.cumulativeAckFlushRequired = true;
    }

    private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageIdImpl, Map<String, Long> map) {
        if (this.acknowledgementGroupTimeMicros == 0 || !(map == null || map.isEmpty())) {
            return doImmediateBatchIndexAck(batchMessageIdImpl, batchMessageIdImpl.getBatchIndex(), batchMessageIdImpl.getBatchSize(), CommandAck.AckType.Cumulative, map);
        }
        BitSetRecyclable create = BitSetRecyclable.create();
        create.set(0, batchMessageIdImpl.getBatchSize());
        create.clear(0, batchMessageIdImpl.getBatchIndex() + 1);
        return doCumulativeAck(batchMessageIdImpl, null, create);
    }

    private CompletableFuture<Void> doImmediateAck(MessageIdImpl messageIdImpl, CommandAck.AckType ackType, Map<String, Long> map, BitSetRecyclable bitSetRecyclable) {
        ClientCnx clientCnx = this.consumer.getClientCnx();
        return clientCnx == null ? FutureUtil.failedFuture(new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + this.consumer.getState())) : newImmediateAckAndFlush(this.consumer.consumerId, messageIdImpl, bitSetRecyclable, ackType, map, clientCnx);
    }

    private CompletableFuture<Void> doImmediateBatchIndexAck(BatchMessageIdImpl batchMessageIdImpl, int i, int i2, CommandAck.AckType ackType, Map<String, Long> map) {
        BitSetRecyclable create;
        ClientCnx clientCnx = this.consumer.getClientCnx();
        if (clientCnx == null) {
            return FutureUtil.failedFuture(new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + this.consumer.getState()));
        }
        if (batchMessageIdImpl.getAcker() == null || (batchMessageIdImpl.getAcker() instanceof BatchMessageAckerDisabled)) {
            create = BitSetRecyclable.create();
            create.set(0, i2);
        } else {
            create = BitSetRecyclable.valueOf(batchMessageIdImpl.getAcker().getBitSet().toLongArray());
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            create.clear(0, i + 1);
        } else {
            create.clear(i);
        }
        CompletableFuture<Void> newMessageAckCommandAndWrite = newMessageAckCommandAndWrite(clientCnx, this.consumer.consumerId, batchMessageIdImpl.ledgerId, batchMessageIdImpl.entryId, create, ackType, null, map, true, null, null);
        create.recycle();
        return newMessageAckCommandAndWrite;
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public void flush() {
        ClientCnx clientCnx = this.consumer.getClientCnx();
        if (clientCnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
        } else {
            if (!isAckReceiptEnabled(this.consumer.getClientCnx())) {
                flushAsync(clientCnx);
                return;
            }
            this.lock.writeLock().lock();
            try {
                flushAsync(clientCnx);
            } finally {
                this.lock.writeLock().unlock();
            }
        }
    }

    private void flushAsync(ClientCnx clientCnx) {
        boolean z = false;
        if (this.cumulativeAckFlushRequired) {
            newMessageAckCommandAndWrite(clientCnx, this.consumer.consumerId, this.lastCumulativeAck.messageId.ledgerId, this.lastCumulativeAck.messageId.getEntryId(), this.lastCumulativeAck.bitSetRecyclable, CommandAck.AckType.Cumulative, null, Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(this.lastCumulativeAck.messageId);
            z = true;
            this.cumulativeAckFlushRequired = false;
        }
        ArrayList arrayList = new ArrayList(this.pendingIndividualAcks.size() + this.pendingIndividualBatchIndexAcks.size());
        if (!this.pendingIndividualAcks.isEmpty()) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(clientCnx.getRemoteEndpointProtocolVersion())) {
                while (true) {
                    MessageIdImpl pollFirst = this.pendingIndividualAcks.pollFirst();
                    if (pollFirst == null) {
                        break;
                    }
                    MessageIdImpl[] messageIdImplArr = this.consumer.unAckedChunkedMessageIdSequenceMap.get(pollFirst);
                    if (messageIdImplArr == null || messageIdImplArr.length <= 1) {
                        arrayList.add(Triple.of(Long.valueOf(pollFirst.getLedgerId()), Long.valueOf(pollFirst.getEntryId()), null));
                    } else {
                        for (MessageIdImpl messageIdImpl : messageIdImplArr) {
                            if (messageIdImpl != null) {
                                arrayList.add(Triple.of(Long.valueOf(messageIdImpl.getLedgerId()), Long.valueOf(messageIdImpl.getEntryId()), null));
                            }
                        }
                        this.consumer.unAckedChunkedMessageIdSequenceMap.remove(pollFirst);
                    }
                }
            } else {
                while (true) {
                    MessageIdImpl pollFirst2 = this.pendingIndividualAcks.pollFirst();
                    if (pollFirst2 == null) {
                        break;
                    }
                    newMessageAckCommandAndWrite(clientCnx, this.consumer.consumerId, pollFirst2.getLedgerId(), pollFirst2.getEntryId(), null, CommandAck.AckType.Individual, null, Collections.emptyMap(), false, null, null);
                    z = true;
                }
            }
        }
        if (!this.pendingIndividualBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> it = this.pendingIndividualBatchIndexAcks.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> next = it.next();
                arrayList.add(Triple.of(Long.valueOf(next.getKey().ledgerId), Long.valueOf(next.getKey().entryId), next.getValue()));
                it.remove();
            }
        }
        if (arrayList.size() > 0) {
            newMessageAckCommandAndWrite(clientCnx, this.consumer.consumerId, 0L, 0L, null, CommandAck.AckType.Individual, null, null, true, this.currentIndividualAckFuture, arrayList);
            z = true;
        }
        if (z) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks, this.pendingIndividualBatchIndexAcks});
            }
            clientCnx.ctx().flush();
        }
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker
    public void flushAndClean() {
        flush();
        this.lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
        this.pendingIndividualAcks.clear();
    }

    @Override // org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker, java.lang.AutoCloseable
    public void close() {
        flush();
        if (this.scheduledTask == null || this.scheduledTask.isCancelled()) {
            return;
        }
        this.scheduledTask.cancel(true);
    }

    private CompletableFuture<Void> newImmediateAckAndFlush(long j, MessageIdImpl messageIdImpl, BitSetRecyclable bitSetRecyclable, CommandAck.AckType ackType, Map<String, Long> map, ClientCnx clientCnx) {
        CompletableFuture<Void> newMessageAckCommandAndWrite;
        MessageIdImpl[] remove = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageIdImpl);
        if (remove == null || ackType == CommandAck.AckType.Cumulative) {
            newMessageAckCommandAndWrite = newMessageAckCommandAndWrite(clientCnx, j, messageIdImpl.ledgerId, messageIdImpl.getEntryId(), bitSetRecyclable, ackType, null, map, true, null, null);
        } else if (Commands.peerSupportsMultiMessageAcknowledgment(clientCnx.getRemoteEndpointProtocolVersion())) {
            ArrayList arrayList = new ArrayList(remove.length);
            for (MessageIdImpl messageIdImpl2 : remove) {
                if (messageIdImpl2 != null && remove.length > 1) {
                    arrayList.add(Triple.of(Long.valueOf(messageIdImpl2.getLedgerId()), Long.valueOf(messageIdImpl2.getEntryId()), null));
                }
            }
            newMessageAckCommandAndWrite = newMessageAckCommandAndWrite(clientCnx, this.consumer.consumerId, 0L, 0L, null, ackType, null, null, true, null, arrayList);
        } else {
            for (MessageIdImpl messageIdImpl3 : remove) {
                newMessageAckCommandAndWrite(clientCnx, j, messageIdImpl3.getLedgerId(), messageIdImpl3.getEntryId(), bitSetRecyclable, ackType, null, map, true, null, null);
            }
            newMessageAckCommandAndWrite = CompletableFuture.completedFuture(null);
        }
        return newMessageAckCommandAndWrite;
    }

    private CompletableFuture<Void> newMessageAckCommandAndWrite(ClientCnx clientCnx, long j, long j2, long j3, BitSetRecyclable bitSetRecyclable, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> map, boolean z, TimedCompletableFuture<Void> timedCompletableFuture, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> list) {
        if (isAckReceiptEnabled(this.consumer.getClientCnx())) {
            long newRequestId = this.consumer.getClient().newRequestId();
            ByteBuf newAck = list == null ? Commands.newAck(j, j2, j3, bitSetRecyclable, ackType, null, map, newRequestId) : Commands.newMultiMessageAck(j, list, newRequestId);
            if (timedCompletableFuture == null) {
                return clientCnx.newAckForReceipt(newAck, newRequestId);
            }
            if (ackType == CommandAck.AckType.Individual) {
                this.currentIndividualAckFuture = new TimedCompletableFuture<>();
            } else {
                this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
            }
            clientCnx.newAckForReceiptWithFuture(newAck, newRequestId, timedCompletableFuture);
            return timedCompletableFuture;
        }
        if (this.ackReceiptEnabled) {
            synchronized (this) {
                if (!this.currentCumulativeAckFuture.isDone()) {
                    this.currentCumulativeAckFuture.complete(null);
                }
                if (!this.currentIndividualAckFuture.isDone()) {
                    this.currentIndividualAckFuture.complete(null);
                }
            }
        }
        ByteBuf newAck2 = list == null ? Commands.newAck(j, j2, j3, bitSetRecyclable, ackType, null, map, -1L) : Commands.newMultiMessageAck(j, list, -1L);
        if (z) {
            clientCnx.ctx().writeAndFlush(newAck2, clientCnx.ctx().voidPromise());
        } else {
            clientCnx.ctx().write(newAck2, clientCnx.ctx().voidPromise());
        }
        return CompletableFuture.completedFuture(null);
    }

    private boolean isAckReceiptEnabled(ClientCnx clientCnx) {
        return this.ackReceiptEnabled && clientCnx != null && Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion());
    }
}
