package io.streamnative.beam.pulsar;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/beam/pulsar/PulsarMessageIdRangeTracker.class */
public class PulsarMessageIdRangeTracker extends RestrictionTracker<PulsarMessageIdRange, MessageId> implements RestrictionTracker.HasProgress {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarMessageIdRangeTracker.class);
    private PulsarMessageIdRange range;
    protected MessageId lastClaimedOffset;
    protected MessageId lastAttemptedOffset;
    protected PulsarAdmin admin;
    protected String topic;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/beam/pulsar/PulsarMessageIdRangeTracker$LedgerPosition.class */
    public static class LedgerPosition {
        private final long ledgerId;
        private final long entryId;
        private final boolean valid;

        public LedgerPosition(long j, long j2) {
            this.ledgerId = j;
            this.entryId = j2;
            this.valid = j >= 0 && j2 >= 0;
        }

        public boolean isValid() {
            return this.valid;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/beam/pulsar/PulsarMessageIdRangeTracker$SplitPoint.class */
    public static class SplitPoint {
        private final MessageId splitMessageId;
        private final MessageId rightOfSplitMessageId;
        private final boolean valid;

        public SplitPoint(MessageId messageId, MessageId messageId2, boolean z) {
            this.splitMessageId = messageId;
            this.rightOfSplitMessageId = messageId2;
            this.valid = z;
        }

        public boolean isValid() {
            return this.valid;
        }

        public MessageId getSplitMessageId() {
            return this.splitMessageId;
        }

        public MessageId getRightOfSplitMessageId() {
            return this.rightOfSplitMessageId;
        }
    }

    public PulsarMessageIdRangeTracker(PulsarMessageIdRange pulsarMessageIdRange) {
        this.lastClaimedOffset = null;
        this.lastAttemptedOffset = null;
        this.range = (PulsarMessageIdRange) Preconditions.checkNotNull(pulsarMessageIdRange);
        LOG.debug("Created tracker with range: [{}, {})", pulsarMessageIdRange.getFrom(), pulsarMessageIdRange.getTo());
    }

    public PulsarMessageIdRangeTracker(PulsarMessageIdRange pulsarMessageIdRange, PulsarAdmin pulsarAdmin, String str) {
        this(pulsarMessageIdRange);
        this.admin = pulsarAdmin;
        this.topic = str;
    }

    public boolean tryClaim(MessageId messageId) {
        LOG.info("Trying to claim message: {}, Current range: [{}, {}], Last attempted: {}, Last claimed: {}", new Object[]{messageId, this.range.getFrom(), this.range.getTo(), this.lastAttemptedOffset, this.lastClaimedOffset});
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(messageId != null, "MessageId cannot be null");
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.range.getFrom() == null || this.range.getTo() == null) ? false : true, "Range boundaries cannot be null");
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(this.lastAttemptedOffset == null || messageId.compareTo(this.lastAttemptedOffset) > 0, "Trying to claim offset %s while last attempted was %s", messageId, this.lastAttemptedOffset);
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(messageId.compareTo(this.range.getFrom()) >= 0, "Trying to claim offset %s before start of the range %s:%s", messageId, this.range.getFrom(), this.range.getTo());
        this.lastAttemptedOffset = messageId;
        if (!isLatestMessageId(this.range.getTo()) && messageId.compareTo(this.range.getTo()) > 0) {
            LOG.info("Message {} is beyond range end {}, claim rejected", messageId, this.range.getTo());
            return false;
        }
        this.lastClaimedOffset = messageId;
        LOG.info("Successfully claimed message: {}", messageId);
        return true;
    }

    /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
    public PulsarMessageIdRange m7currentRestriction() {
        return this.range;
    }

    public SplitResult<PulsarMessageIdRange> trySplit(double d) {
        LOG.info("Attempting to split range: {} with fraction: {}", this.range, Double.valueOf(d));
        if (!isValidForSplit(d)) {
            LOG.info("Split validation failed for fraction: {}", Double.valueOf(d));
            return null;
        }
        try {
            MessageId unprocessedRangeStart = getUnprocessedRangeStart();
            LOG.info("Unprocessed range starts at: {}", unprocessedRangeStart);
            if (unprocessedRangeStart.compareTo(this.range.getTo()) > 0) {
                LOG.info("No more messages to split, unprocessed start {} > range end {}", unprocessedRangeStart, this.range.getTo());
                return null;
            }
            if (d == 0.0d) {
                SplitResult<PulsarMessageIdRange> handleZeroFractionSplit = handleZeroFractionSplit();
                LOG.info("Zero fraction split result: {}", handleZeroFractionSplit);
                return handleZeroFractionSplit;
            }
            MessageIdAdv safeGetMessageIdAdv = safeGetMessageIdAdv(unprocessedRangeStart);
            MessageIdAdv safeGetMessageIdAdv2 = safeGetMessageIdAdv(this.range.getTo());
            PersistentTopicInternalStats topicStatsWithRetry = getTopicStatsWithRetry();
            if (topicStatsWithRetry == null) {
                LOG.info("Failed to get topic stats for topic: {}", this.topic);
                return null;
            }
            LOG.info("Retrieved topic stats for {}, current ledger entries: {}", this.topic, Long.valueOf(topicStatsWithRetry.currentLedgerEntries));
            List<ManagedLedgerInternalStats.LedgerInfo> validLedgers = getValidLedgers(topicStatsWithRetry, safeGetMessageIdAdv, safeGetMessageIdAdv2);
            LOG.info("Found {} valid ledgers between {} and {}", new Object[]{Integer.valueOf(validLedgers.size()), safeGetMessageIdAdv, safeGetMessageIdAdv2});
            SplitPoint calculateSplitPoint = calculateSplitPoint(topicStatsWithRetry, validLedgers, safeGetMessageIdAdv, safeGetMessageIdAdv2, d);
            if (calculateSplitPoint.isValid()) {
                return createSplitResult(calculateSplitPoint);
            }
            return null;
        } catch (Exception e) {
            LOG.error("Failed to split the range: {} with fraction: {}", new Object[]{this.range, Double.valueOf(d), e});
            return null;
        }
    }

    private boolean isValidForSplit(double d) {
        if (this.range.getTo().equals(this.range.getFrom())) {
            return false;
        }
        return (this.lastAttemptedOffset == null || this.lastAttemptedOffset.compareTo(this.range.getTo()) != 0) && d >= 0.0d && this.admin != null;
    }

    private MessageId getUnprocessedRangeStart() {
        MessageId from = this.lastAttemptedOffset == null ? this.range.getFrom() : next(this.lastAttemptedOffset);
        LOG.debug("Calculated unprocessed range start: {}, lastAttemptedOffset: {}", from, this.lastAttemptedOffset);
        return from;
    }

    private SplitResult<PulsarMessageIdRange> handleZeroFractionSplit() {
        if (this.lastAttemptedOffset == null) {
            PulsarMessageIdRange pulsarMessageIdRange = new PulsarMessageIdRange(this.range.getFrom(), this.range.getFrom());
            PulsarMessageIdRange pulsarMessageIdRange2 = new PulsarMessageIdRange(next(this.range.getFrom()), this.range.getTo());
            this.range = pulsarMessageIdRange;
            LOG.info("Zero fraction split result: primary=[{}, {}], residual=[{}, {}]", new Object[]{pulsarMessageIdRange.getFrom(), pulsarMessageIdRange.getTo(), pulsarMessageIdRange2.getFrom(), pulsarMessageIdRange2.getTo()});
            return SplitResult.of(pulsarMessageIdRange, pulsarMessageIdRange2);
        }
        if (this.lastAttemptedOffset.compareTo(this.range.getTo()) >= 0) {
            LOG.info("Last attempted offset {} has reached or exceeded range end {}, no split needed", this.lastAttemptedOffset, this.range.getTo());
            return null;
        }
        PulsarMessageIdRange pulsarMessageIdRange3 = new PulsarMessageIdRange(this.range.getFrom(), this.lastAttemptedOffset);
        if (!isLatestMessageId(this.range.getTo()) && this.lastAttemptedOffset.compareTo(this.range.getTo()) >= 0) {
            this.range = pulsarMessageIdRange3;
            LOG.info("No residual range needed, using only primary range {}", pulsarMessageIdRange3);
            return null;
        }
        PulsarMessageIdRange pulsarMessageIdRange4 = new PulsarMessageIdRange(next(this.lastAttemptedOffset), this.range.getTo());
        this.range = pulsarMessageIdRange3;
        LOG.info("Split into primary range {} and residual range {}", pulsarMessageIdRange3, pulsarMessageIdRange4);
        return SplitResult.of(pulsarMessageIdRange3, pulsarMessageIdRange4);
    }

    private MessageIdAdv safeGetMessageIdAdv(MessageId messageId) {
        if (messageId instanceof MessageIdAdv) {
            return (MessageIdAdv) messageId;
        }
        if (!(messageId instanceof MessageIdImpl)) {
            throw new IllegalArgumentException("Unsupported MessageId type: " + messageId.getClass());
        }
        MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        return new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex());
    }

    private PersistentTopicInternalStats getTopicStatsWithRetry() {
        try {
            return this.admin.topics().getInternalStats(this.topic);
        } catch (PulsarAdminException e) {
            LOG.error("Failed to get topic stats", e);
            return null;
        }
    }

    private List<ManagedLedgerInternalStats.LedgerInfo> getValidLedgers(PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv, MessageIdAdv messageIdAdv2) {
        return persistentTopicInternalStats.ledgers.stream().filter(ledgerInfo -> {
            return ledgerInfo.ledgerId >= messageIdAdv.getLedgerId() && ledgerInfo.ledgerId <= messageIdAdv2.getLedgerId();
        }).sorted(Comparator.comparingLong(ledgerInfo2 -> {
            return ledgerInfo2.ledgerId;
        })).toList();
    }

    private long calculateTotalEntries(List<ManagedLedgerInternalStats.LedgerInfo> list, PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv, MessageIdAdv messageIdAdv2) {
        return entriesBetweenMessages(messageIdAdv, messageIdAdv2, list, persistentTopicInternalStats);
    }

    private long calculateLedgerEntries(ManagedLedgerInternalStats.LedgerInfo ledgerInfo, PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv) {
        return ledgerInfo.ledgerId == messageIdAdv.getLedgerId() ? ledgerInfo.entries == 0 ? Math.max(0L, (persistentTopicInternalStats.currentLedgerEntries - messageIdAdv.getEntryId()) + 1) : Math.max(0L, (ledgerInfo.entries - messageIdAdv.getEntryId()) + 1) : ledgerInfo.entries == 0 ? persistentTopicInternalStats.currentLedgerEntries : ledgerInfo.entries;
    }

    private long calculateEntryId(ManagedLedgerInternalStats.LedgerInfo ledgerInfo, PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv, long j) {
        return ledgerInfo.ledgerId == messageIdAdv.getLedgerId() ? (messageIdAdv.getEntryId() + j) - 1 : j - 1;
    }

    private MessageId createMessageId(LedgerPosition ledgerPosition, MessageIdAdv messageIdAdv) {
        return new MessageIdImpl(ledgerPosition.ledgerId, ledgerPosition.entryId, messageIdAdv.getPartitionIndex());
    }

    private SplitPoint adjustForBatchMessage(MessageId messageId, MessageId messageId2) {
        if (!(messageId instanceof BatchMessageIdImpl)) {
            return new SplitPoint(messageId, messageId2, true);
        }
        BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId;
        return new SplitPoint(new BatchMessageIdImpl(batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), batchMessageIdImpl.getPartitionIndex(), 0), messageId2, true);
    }

    private SplitResult<PulsarMessageIdRange> createSplitResult(SplitPoint splitPoint) {
        if (!splitPoint.isValid()) {
            return null;
        }
        PulsarMessageIdRange pulsarMessageIdRange = new PulsarMessageIdRange(this.lastAttemptedOffset != null ? this.lastAttemptedOffset : this.range.getFrom(), splitPoint.getSplitMessageId());
        PulsarMessageIdRange pulsarMessageIdRange2 = new PulsarMessageIdRange(splitPoint.getRightOfSplitMessageId(), this.range.getTo());
        this.range = pulsarMessageIdRange;
        LOG.info("Split result: primary=[{}, {}], residual=[{}, {}]", new Object[]{pulsarMessageIdRange.getFrom(), pulsarMessageIdRange.getTo(), pulsarMessageIdRange2.getFrom(), pulsarMessageIdRange2.getTo()});
        return SplitResult.of(pulsarMessageIdRange, pulsarMessageIdRange2);
    }

    private SplitPoint calculateSplitPoint(PersistentTopicInternalStats persistentTopicInternalStats, List<ManagedLedgerInternalStats.LedgerInfo> list, MessageIdAdv messageIdAdv, MessageIdAdv messageIdAdv2, double d) {
        long entriesBetweenMessages;
        long calculateTotalEntries = calculateTotalEntries(list, persistentTopicInternalStats, messageIdAdv, messageIdAdv2);
        if (calculateTotalEntries <= 1) {
            return new SplitPoint(null, null, false);
        }
        if (this.lastAttemptedOffset == null) {
            entriesBetweenMessages = Math.max(1L, (long) (calculateTotalEntries * d));
        } else {
            entriesBetweenMessages = entriesBetweenMessages(messageIdAdv, safeGetMessageIdAdv(this.lastAttemptedOffset), list, persistentTopicInternalStats) + Math.max(1L, (long) ((calculateTotalEntries - r0) * d));
        }
        if (entriesBetweenMessages >= calculateTotalEntries) {
            return new SplitPoint(null, null, false);
        }
        LedgerPosition findSplitPosition = findSplitPosition(list, persistentTopicInternalStats, messageIdAdv, entriesBetweenMessages);
        if (!findSplitPosition.isValid()) {
            return new SplitPoint(null, null, false);
        }
        Optional<ManagedLedgerInternalStats.LedgerInfo> findFirst = list.stream().filter(ledgerInfo -> {
            return ledgerInfo.ledgerId == findSplitPosition.ledgerId;
        }).findFirst();
        LedgerPosition ledgerPosition = findSplitPosition;
        if (findFirst.isPresent()) {
            long j = findFirst.get().entries == 0 ? persistentTopicInternalStats.currentLedgerEntries : findFirst.get().entries;
            if (findSplitPosition.entryId >= j) {
                ledgerPosition = new LedgerPosition(findSplitPosition.ledgerId, j - 1);
                LOG.info("Adjusted split position to last existing message: {}:{}", Long.valueOf(ledgerPosition.ledgerId), Long.valueOf(ledgerPosition.entryId));
            }
        }
        MessageId createMessageId = createMessageId(ledgerPosition, messageIdAdv2);
        MessageId findNextValidMessageId = findNextValidMessageId(ledgerPosition, list, persistentTopicInternalStats, messageIdAdv2);
        return findNextValidMessageId == null ? new SplitPoint(null, null, false) : createMessageId instanceof BatchMessageIdImpl ? adjustForBatchMessage(createMessageId, findNextValidMessageId) : new SplitPoint(createMessageId, findNextValidMessageId, true);
    }

    private LedgerPosition findSplitPosition(List<ManagedLedgerInternalStats.LedgerInfo> list, PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv, long j) {
        long j2 = j;
        long j3 = -1;
        long j4 = -1;
        Iterator<ManagedLedgerInternalStats.LedgerInfo> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ManagedLedgerInternalStats.LedgerInfo next = it.next();
            long calculateLedgerEntries = calculateLedgerEntries(next, persistentTopicInternalStats, messageIdAdv);
            LOG.info("Ledger {} has {} entries", Long.valueOf(next.ledgerId), Long.valueOf(calculateLedgerEntries));
            if (j2 <= calculateLedgerEntries) {
                j3 = next.ledgerId;
                j4 = calculateEntryId(next, persistentTopicInternalStats, messageIdAdv, j2);
                LOG.info("Split point found in ledger {}, entry {}, remaining entries: {}", new Object[]{Long.valueOf(j3), Long.valueOf(j4), Long.valueOf(j2)});
                break;
            }
            j2 -= calculateLedgerEntries;
            LOG.info("After ledger {}, remaining entries: {}", Long.valueOf(next.ledgerId), Long.valueOf(j2));
        }
        return new LedgerPosition(j3, j4);
    }

    private MessageId findNextValidMessageId(LedgerPosition ledgerPosition, List<ManagedLedgerInternalStats.LedgerInfo> list, PersistentTopicInternalStats persistentTopicInternalStats, MessageIdAdv messageIdAdv) {
        long longValue = ((Long) list.stream().filter(ledgerInfo -> {
            return ledgerInfo.ledgerId == ledgerPosition.ledgerId;
        }).findFirst().map(ledgerInfo2 -> {
            return Long.valueOf(ledgerInfo2.entries == 0 ? persistentTopicInternalStats.currentLedgerEntries : ledgerInfo2.entries);
        }).orElse(0L)).longValue();
        if (ledgerPosition.entryId + 1 < longValue) {
            return new MessageIdImpl(ledgerPosition.ledgerId, ledgerPosition.entryId + 1, messageIdAdv.getPartitionIndex());
        }
        Optional<ManagedLedgerInternalStats.LedgerInfo> min = list.stream().filter(ledgerInfo3 -> {
            return ledgerInfo3.ledgerId > ledgerPosition.ledgerId;
        }).min(Comparator.comparingLong(ledgerInfo4 -> {
            return ledgerInfo4.ledgerId;
        }));
        return min.isPresent() ? new MessageIdImpl(min.get().ledgerId, 0L, messageIdAdv.getPartitionIndex()) : new MessageIdImpl(ledgerPosition.ledgerId, longValue - 1, messageIdAdv.getPartitionIndex());
    }

    public void checkDone() throws IllegalStateException {
        LOG.info("Checking if processing is done. Range: [{}, {}], Last attempted: {}, Last claimed: {}", new Object[]{this.range.getFrom(), this.range.getTo(), this.lastAttemptedOffset, this.lastClaimedOffset});
        if (isLatestMessageId(this.range.getTo())) {
            return;
        }
        if (this.range.getFrom().compareTo(this.range.getTo()) >= 0) {
            LOG.info("Range is empty or invalid, processing is done");
            return;
        }
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(this.lastAttemptedOffset != null, "Message range %s is non-empty and no messages have been attempted.", this.range);
        if (this.lastAttemptedOffset instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl batchMessageIdImpl = this.lastAttemptedOffset;
            LOG.info("Checking batch message completion. Batch index: {}, Batch size: {}", Integer.valueOf(batchMessageIdImpl.getBatchIndex()), Integer.valueOf(batchMessageIdImpl.getBatchSize()));
            if (batchMessageIdImpl.getBatchIndex() < batchMessageIdImpl.getBatchSize() - 1) {
                String format = String.format("Batch message not fully processed. Current batch index: %d, batch size: %d, message: %s", Integer.valueOf(batchMessageIdImpl.getBatchIndex()), Integer.valueOf(batchMessageIdImpl.getBatchSize()), batchMessageIdImpl);
                LOG.error(format);
                throw new IllegalStateException(format);
            }
        }
        int compareTo = this.lastAttemptedOffset.compareTo(this.range.getTo());
        if (compareTo == 0) {
            LOG.info("Processing completed exactly at range end: {}", this.lastAttemptedOffset);
        } else if (compareTo > 0) {
            LOG.warn("Processing went beyond range end. Last attempted: {}, Range end: {}", this.lastAttemptedOffset, this.range.getTo());
        } else {
            String format2 = String.format("Message range %s is not exhausted. Last attempted message: %s, next message: %s", this.range, this.lastAttemptedOffset, next(this.lastAttemptedOffset));
            LOG.error(format2);
            throw new IllegalStateException(format2);
        }
    }

    private MessageId next(MessageId messageId) {
        BatchMessageIdImpl messageIdImpl;
        if (messageId instanceof MessageIdAdv) {
            MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
            messageIdImpl = (messageIdAdv.getBatchIndex() < 0 || messageIdAdv.getBatchSize() <= 0) ? new MessageIdImpl(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId() + 1, messageIdAdv.getPartitionIndex()) : new BatchMessageIdImpl(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), messageIdAdv.getPartitionIndex(), messageIdAdv.getBatchIndex() + 1);
        } else {
            MessageIdImpl messageIdImpl2 = (MessageIdImpl) messageId;
            messageIdImpl = new MessageIdImpl(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId() + 1, messageIdImpl2.getPartitionIndex());
        }
        return messageIdImpl;
    }

    public String toString() {
        return String.format("PulsarMessageIdRangeTracker{range=%s, lastClaimed=%d}", this.range, this.lastClaimedOffset);
    }

    public RestrictionTracker.IsBounded isBounded() {
        return isLatestMessageId(this.range.getTo()) ? RestrictionTracker.IsBounded.UNBOUNDED : RestrictionTracker.IsBounded.BOUNDED;
    }

    private boolean isLatestMessageId(MessageId messageId) {
        return messageId == MessageId.latest || ((messageId instanceof MessageIdImpl) && ((MessageIdImpl) messageId).getLedgerId() == -1 && ((MessageIdImpl) messageId).getEntryId() == -1);
    }

    public RestrictionTracker.Progress getProgress() {
        if (this.admin == null) {
            return RestrictionTracker.Progress.from(0.0d, 9.223372036854776E18d);
        }
        try {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(this.topic);
            List list = internalStats.ledgers.stream().filter(ledgerInfo -> {
                return ledgerInfo.ledgerId >= this.range.getFrom().getLedgerId();
            }).sorted(Comparator.comparingLong(ledgerInfo2 -> {
                return ledgerInfo2.ledgerId;
            })).toList();
            if (this.lastClaimedOffset == null) {
                return isLatestMessageId(this.range.getTo()) ? RestrictionTracker.Progress.from(0.0d, 9.223372036854776E18d) : RestrictionTracker.Progress.from(0.0d, entriesBetweenMessages(this.range.getFrom(), this.range.getTo(), list, internalStats));
            }
            long max = Math.max(entriesBetweenMessages(this.range.getFrom(), this.lastClaimedOffset, list, internalStats), 0L);
            if (isLatestMessageId(this.range.getTo())) {
                return RestrictionTracker.Progress.from(max, 9.223372036854776E18d);
            }
            return RestrictionTracker.Progress.from(max, Math.max(entriesBetweenMessages(r0, this.range.getTo(), list, internalStats) - 1, 0L));
        } catch (Exception e) {
            LOG.error("Failed to get progress", e);
            return RestrictionTracker.Progress.from(0.0d, 9.223372036854776E18d);
        }
    }

    @VisibleForTesting
    static long entriesBetweenMessages(MessageIdAdv messageIdAdv, MessageIdAdv messageIdAdv2, List<ManagedLedgerInternalStats.LedgerInfo> list, PersistentTopicInternalStats persistentTopicInternalStats) {
        if (messageIdAdv == null || messageIdAdv2 == null || messageIdAdv.compareTo(messageIdAdv2) > 0) {
            return 0L;
        }
        if (messageIdAdv.getLedgerId() == messageIdAdv2.getLedgerId()) {
            return (messageIdAdv2.getEntryId() - messageIdAdv.getEntryId()) + 1;
        }
        AtomicLong atomicLong = new AtomicLong();
        list.forEach(ledgerInfo -> {
            if (ledgerInfo.ledgerId < messageIdAdv.getLedgerId() || ledgerInfo.ledgerId > messageIdAdv2.getLedgerId()) {
                return;
            }
            if (ledgerInfo.ledgerId != messageIdAdv.getLedgerId()) {
                if (ledgerInfo.ledgerId == messageIdAdv2.getLedgerId()) {
                    atomicLong.addAndGet(messageIdAdv2.getEntryId() + 1);
                    return;
                } else if (ledgerInfo.entries == 0) {
                    atomicLong.addAndGet(persistentTopicInternalStats.currentLedgerEntries);
                    return;
                } else {
                    atomicLong.addAndGet(ledgerInfo.entries);
                    return;
                }
            }
            if (ledgerInfo.entries == 0) {
                if (persistentTopicInternalStats.currentLedgerEntries <= messageIdAdv.getEntryId()) {
                    return;
                }
                atomicLong.addAndGet(persistentTopicInternalStats.currentLedgerEntries - messageIdAdv.getEntryId());
            } else {
                if (ledgerInfo.entries <= messageIdAdv.getEntryId()) {
                    return;
                }
                atomicLong.addAndGet(ledgerInfo.entries - messageIdAdv.getEntryId());
            }
        });
        return atomicLong.get();
    }
}
