package io.pravega.segmentstore.server.tables;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.BufferView;
import io.pravega.segmentstore.contracts.tables.TableEntry;
import io.pravega.segmentstore.server.DirectSegmentAccess;
import io.pravega.segmentstore.server.reading.AsyncReadResultProcessor;
import io.pravega.segmentstore.server.tables.AsyncTableEntryReader;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/server/tables/TableEntryDeltaIterator.class */
class TableEntryDeltaIterator<T> implements AsyncIterator<T> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TableEntryDeltaIterator.class);
    private static final int MAX_READ_SIZE = 2097152;
    private final DirectSegmentAccess segment;
    private final long startOffset;
    private final int maxBytesToRead;
    private final boolean shouldClear;
    private final Duration fetchTimeout;
    private final EntrySerializer entrySerializer;
    private final ConvertResult<T> resultConverter;
    private final Executor executor;

    @GuardedBy("this")
    private Iterator<Map.Entry<DeltaIteratorState, TableEntry>> currentEntry;

    @GuardedBy("this")
    private long currentBatchOffset;

    @FunctionalInterface
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/TableEntryDeltaIterator$ConvertResult.class */
    interface ConvertResult<T> {
        CompletableFuture<T> apply(Map.Entry<DeltaIteratorState, TableEntry> entry);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/TableEntryDeltaIterator$TableEntryDeltaIteratorBuilder.class */
    public static class TableEntryDeltaIteratorBuilder<T> {

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private DirectSegmentAccess segment;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private long startOffset;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private int maxBytesToRead;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private boolean shouldClear;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private Duration fetchTimeout;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private EntrySerializer entrySerializer;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private ConvertResult<T> resultConverter;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private Executor executor;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private Iterator<Map.Entry<DeltaIteratorState, TableEntry>> currentEntry;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private long currentBatchOffset;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        TableEntryDeltaIteratorBuilder() {
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> segment(DirectSegmentAccess directSegmentAccess) {
            this.segment = directSegmentAccess;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> startOffset(long j) {
            this.startOffset = j;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> maxBytesToRead(int i) {
            this.maxBytesToRead = i;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> shouldClear(boolean z) {
            this.shouldClear = z;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> fetchTimeout(Duration duration) {
            this.fetchTimeout = duration;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> entrySerializer(EntrySerializer entrySerializer) {
            this.entrySerializer = entrySerializer;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> resultConverter(ConvertResult<T> convertResult) {
            this.resultConverter = convertResult;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> currentEntry(Iterator<Map.Entry<DeltaIteratorState, TableEntry>> it) {
            this.currentEntry = it;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIteratorBuilder<T> currentBatchOffset(long j) {
            this.currentBatchOffset = j;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TableEntryDeltaIterator<T> build() {
            return new TableEntryDeltaIterator<>(this.segment, this.startOffset, this.maxBytesToRead, this.shouldClear, this.fetchTimeout, this.entrySerializer, this.resultConverter, this.executor, this.currentEntry, this.currentBatchOffset);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            DirectSegmentAccess directSegmentAccess = this.segment;
            long j = this.startOffset;
            int i = this.maxBytesToRead;
            boolean z = this.shouldClear;
            Duration duration = this.fetchTimeout;
            EntrySerializer entrySerializer = this.entrySerializer;
            ConvertResult<T> convertResult = this.resultConverter;
            Executor executor = this.executor;
            Iterator<Map.Entry<DeltaIteratorState, TableEntry>> it = this.currentEntry;
            long j2 = this.currentBatchOffset;
            return "TableEntryDeltaIterator.TableEntryDeltaIteratorBuilder(segment=" + directSegmentAccess + ", startOffset=" + j + ", maxBytesToRead=" + directSegmentAccess + ", shouldClear=" + i + ", fetchTimeout=" + z + ", entrySerializer=" + duration + ", resultConverter=" + entrySerializer + ", executor=" + convertResult + ", currentEntry=" + executor + ", currentBatchOffset=" + it + ")";
        }
    }

    public CompletableFuture<T> getNext() {
        return (CompletableFuture<T>) getNextEntry().thenCompose(entry -> {
            return entry == null ? CompletableFuture.completedFuture(null) : this.resultConverter.apply(entry);
        });
    }

    public synchronized boolean endOfSegment() {
        return this.currentBatchOffset >= this.startOffset + ((long) this.maxBytesToRead);
    }

    private synchronized CompletableFuture<Map.Entry<DeltaIteratorState, TableEntry>> getNextEntry() {
        Map.Entry<DeltaIteratorState, TableEntry> nextEntryFromBatch = getNextEntryFromBatch();
        return nextEntryFromBatch != null ? CompletableFuture.completedFuture(nextEntryFromBatch) : fetchNextTableEntriesBatch().thenApply(r3 -> {
            return getNextEntryFromBatch();
        });
    }

    private synchronized Map.Entry<DeltaIteratorState, TableEntry> getNextEntryFromBatch() {
        if (this.currentEntry == null) {
            return null;
        }
        Map.Entry<DeltaIteratorState, TableEntry> next = this.currentEntry.next();
        if (!this.currentEntry.hasNext()) {
            this.currentEntry = null;
        }
        return next;
    }

    private synchronized CompletableFuture<Void> fetchNextTableEntriesBatch() {
        return toEntries(this.currentBatchOffset).thenAccept(list -> {
            if (list.isEmpty()) {
                this.currentEntry = null;
            } else {
                this.currentEntry = list.iterator();
            }
        });
    }

    private CompletableFuture<List<Map.Entry<DeltaIteratorState, TableEntry>>> toEntries(long j) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(this.fetchTimeout);
        int min = Math.min(this.maxBytesToRead, MAX_READ_SIZE);
        return endOfSegment() ? CompletableFuture.completedFuture(Collections.emptyList()) : AsyncReadResultProcessor.processAll(this.segment.read(j, min, timeoutTimer.getRemaining()), this.executor, timeoutTimer.getRemaining()).thenApply(bufferView -> {
            return parseEntries(bufferView, j, min);
        });
    }

    private List<Map.Entry<DeltaIteratorState, TableEntry>> parseEntries(BufferView bufferView, long j, int i) {
        try {
            long j2 = j;
            long j3 = j + i;
            BufferView.Reader bufferViewReader = bufferView.getBufferViewReader();
            ArrayList arrayList = new ArrayList();
            while (j2 < j3) {
                try {
                    AsyncTableEntryReader.DeserializedEntry readEntryComponents = AsyncTableEntryReader.readEntryComponents(bufferViewReader, j2, this.entrySerializer);
                    boolean z = j2 + ((long) readEntryComponents.getHeader().getTotalLength()) >= ((long) this.maxBytesToRead) + j;
                    j2 += readEntryComponents.getHeader().getTotalLength();
                    arrayList.add(new AbstractMap.SimpleEntry(new DeltaIteratorState(j2, z, this.shouldClear, readEntryComponents.getHeader().isDeletion()), TableEntry.versioned(readEntryComponents.getKey(), readEntryComponents.getValue() == null ? BufferView.empty() : readEntryComponents.getValue(), readEntryComponents.getVersion())));
                } catch (BufferView.Reader.OutOfBoundsException e) {
                }
            }
            this.currentBatchOffset = j2;
            return arrayList;
        } catch (IOException e2) {
            throw e2;
        }
    }

    static <T> TableEntryDeltaIterator<T> empty() {
        return new TableEntryDeltaIterator<>(null, 0L, 0, false, Duration.ofMillis(0L), new EntrySerializer(), entry -> {
            return CompletableFuture.completedFuture(null);
        }, ForkJoinPool.commonPool(), null, 0L);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    @ConstructorProperties({"segment", "startOffset", "maxBytesToRead", "shouldClear", "fetchTimeout", "entrySerializer", "resultConverter", "executor", "currentEntry", "currentBatchOffset"})
    TableEntryDeltaIterator(DirectSegmentAccess directSegmentAccess, long j, int i, boolean z, Duration duration, EntrySerializer entrySerializer, ConvertResult<T> convertResult, Executor executor, Iterator<Map.Entry<DeltaIteratorState, TableEntry>> it, long j2) {
        this.segment = directSegmentAccess;
        this.startOffset = j;
        this.maxBytesToRead = i;
        this.shouldClear = z;
        this.fetchTimeout = duration;
        this.entrySerializer = entrySerializer;
        this.resultConverter = convertResult;
        this.executor = executor;
        this.currentEntry = it;
        this.currentBatchOffset = j2;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public static <T> TableEntryDeltaIteratorBuilder<T> builder() {
        return new TableEntryDeltaIteratorBuilder<>();
    }
}
