package io.pravega.segmentstore.server.tables;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.HashedArray;
import io.pravega.segmentstore.contracts.BadAttributeUpdateException;
import io.pravega.segmentstore.contracts.ReadResult;
import io.pravega.segmentstore.contracts.tables.TableAttributes;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.DirectSegmentAccess;
import io.pravega.segmentstore.server.SegmentOperation;
import io.pravega.segmentstore.server.WriterFlushResult;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.logs.operations.CachedStreamSegmentAppendOperation;
import io.pravega.segmentstore.server.tables.AsyncTableEntryReader;
import io.pravega.segmentstore.server.tables.BucketUpdate;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/tables/WriterTableProcessor.class */
public class WriterTableProcessor implements WriterSegmentProcessor {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final TableWriterConnector connector;
    private final IndexWriter indexWriter;
    private final ScheduledExecutorService executor;
    private final OperationAggregator aggregator;
    private final AtomicLong lastAddedOffset;
    private final AtomicBoolean closed;
    private final String traceObjectId;
    private final TableCompactor compactor;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/WriterTableProcessor$OperationAggregator.class */
    public static class OperationAggregator {

        @GuardedBy("this")
        private long firstSeqNo;

        @GuardedBy("this")
        private long lastOffset;

        @GuardedBy("this")
        private long lastIndexedOffset;

        @GuardedBy("this")
        private final ArrayList<Long> appendOffsets = new ArrayList<>();

        OperationAggregator(long j) {
            reset();
            this.lastIndexedOffset = j;
        }

        synchronized void reset() {
            this.firstSeqNo = Long.MIN_VALUE;
            this.lastOffset = -1L;
            this.appendOffsets.clear();
        }

        synchronized void add(CachedStreamSegmentAppendOperation cachedStreamSegmentAppendOperation) {
            if (this.appendOffsets.size() == 0) {
                this.firstSeqNo = cachedStreamSegmentAppendOperation.getSequenceNumber();
            }
            this.lastOffset = cachedStreamSegmentAppendOperation.getLastStreamSegmentOffset();
            this.appendOffsets.add(Long.valueOf(cachedStreamSegmentAppendOperation.getStreamSegmentOffset()));
        }

        synchronized boolean isEmpty() {
            return this.appendOffsets.isEmpty();
        }

        synchronized long getFirstSequenceNumber() {
            return this.firstSeqNo;
        }

        synchronized long getFirstOffset() {
            if (this.appendOffsets.size() == 0) {
                return -1L;
            }
            return this.appendOffsets.get(0).longValue();
        }

        synchronized long getLastOffset() {
            return this.lastOffset;
        }

        synchronized long getLastIndexedOffset() {
            return this.lastIndexedOffset;
        }

        synchronized boolean setLastIndexedOffset(long j) {
            if (this.appendOffsets.size() > 0) {
                if (j >= getLastOffset()) {
                    reset();
                } else {
                    int indexOf = this.appendOffsets.indexOf(Long.valueOf(j));
                    if (indexOf < 0) {
                        return false;
                    }
                    this.appendOffsets.subList(0, indexOf).clear();
                }
            }
            if (j < 0) {
                return true;
            }
            this.lastIndexedOffset = j;
            return true;
        }

        synchronized int size() {
            return this.appendOffsets.size();
        }

        public synchronized String toString() {
            return String.format("Count = %d, FirstSN = %d, FirstOffset = %d, LastOffset = %d, LIdx = %s", Integer.valueOf(this.appendOffsets.size()), Long.valueOf(this.firstSeqNo), Long.valueOf(getFirstOffset()), Long.valueOf(getLastOffset()), Long.valueOf(getLastIndexedOffset()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/WriterTableProcessor$TableWriterFlushResult.class */
    public static class TableWriterFlushResult extends WriterFlushResult {
        final long lastIndexedOffset;
        final long highestCopiedOffset;

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"lastIndexedOffset", "highestCopiedOffset"})
        public TableWriterFlushResult(long j, long j2) {
            this.lastIndexedOffset = j;
            this.highestCopiedOffset = j2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriterTableProcessor(@NonNull TableWriterConnector tableWriterConnector, @NonNull ScheduledExecutorService scheduledExecutorService) {
        if (tableWriterConnector == null) {
            throw new NullPointerException("connector is marked @NonNull but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
        this.connector = tableWriterConnector;
        this.executor = scheduledExecutorService;
        this.indexWriter = new IndexWriter(tableWriterConnector.getKeyHasher(), scheduledExecutorService);
        this.aggregator = new OperationAggregator(this.indexWriter.getLastIndexedOffset(this.connector.getMetadata()));
        this.lastAddedOffset = new AtomicLong(-1L);
        this.closed = new AtomicBoolean();
        this.traceObjectId = String.format("TableProcessor[%d-%d]", Integer.valueOf(this.connector.getMetadata().getContainerId()), Long.valueOf(this.connector.getMetadata().getId()));
        this.compactor = new TableCompactor(tableWriterConnector, this.indexWriter, this.executor);
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connector.close();
            log.info("{}: Closed.", this.traceObjectId);
        }
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public void add(SegmentOperation segmentOperation) throws DataCorruptionException {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkArgument(segmentOperation.getStreamSegmentId() == this.connector.getMetadata().getId(), "Operation '%s' refers to a different Segment than this one (%s).", segmentOperation, this.connector.getMetadata().getId());
        Preconditions.checkArgument(segmentOperation.getSequenceNumber() != Long.MIN_VALUE, "Operation '%s' does not have a Sequence Number assigned.", segmentOperation);
        if (this.connector.getMetadata().isDeleted() || !(segmentOperation instanceof CachedStreamSegmentAppendOperation)) {
            return;
        }
        CachedStreamSegmentAppendOperation cachedStreamSegmentAppendOperation = (CachedStreamSegmentAppendOperation) segmentOperation;
        if (this.lastAddedOffset.get() >= 0) {
            if (this.lastAddedOffset.get() != cachedStreamSegmentAppendOperation.getStreamSegmentOffset()) {
                throw new DataCorruptionException(String.format("Wrong offset for Operation '%s'. Expected: %s, actual: %d.", segmentOperation, this.lastAddedOffset, Long.valueOf(cachedStreamSegmentAppendOperation.getStreamSegmentOffset())), new Object[0]);
            }
        } else if (this.aggregator.getLastIndexedOffset() < cachedStreamSegmentAppendOperation.getStreamSegmentOffset()) {
            throw new DataCorruptionException(String.format("Operation '%s' begins after TABLE_INDEXED_OFFSET. Expected: %s, actual: %d.", segmentOperation, Long.valueOf(this.aggregator.getLastIndexedOffset()), Long.valueOf(cachedStreamSegmentAppendOperation.getStreamSegmentOffset())), new Object[0]);
        }
        if (cachedStreamSegmentAppendOperation.getStreamSegmentOffset() < this.aggregator.getLastIndexedOffset()) {
            log.debug("{}: Skipped {} (State={}).", new Object[]{this.traceObjectId, segmentOperation, this.aggregator});
            return;
        }
        this.aggregator.add(cachedStreamSegmentAppendOperation);
        this.lastAddedOffset.set(cachedStreamSegmentAppendOperation.getLastStreamSegmentOffset());
        log.debug("{}: Add {} (State={}).", new Object[]{this.traceObjectId, segmentOperation, this.aggregator});
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public long getLowestUncommittedSequenceNumber() {
        return this.aggregator.getFirstSequenceNumber();
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean mustFlush() {
        return (this.connector.getMetadata().isDeleted() || this.aggregator.isEmpty()) ? false : true;
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public CompletableFuture<WriterFlushResult> flush(Duration duration) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        return this.connector.getSegment(timeoutTimer.getRemaining()).thenComposeAsync(directSegmentAccess -> {
            return flushWithSingleRetry(directSegmentAccess, timeoutTimer).thenComposeAsync(tableWriterFlushResult -> {
                flushComplete(tableWriterFlushResult.lastIndexedOffset);
                return compactIfNeeded(directSegmentAccess, tableWriterFlushResult.highestCopiedOffset, timeoutTimer).thenApply(r3 -> {
                    return tableWriterFlushResult;
                });
            }, (Executor) this.executor);
        }, (Executor) this.executor);
    }

    public String toString() {
        return String.format("[%d: %s] Count = %d, LastOffset = %s, LUSN = %d", Long.valueOf(this.connector.getMetadata().getId()), this.connector.getMetadata().getName(), Integer.valueOf(this.aggregator.size()), this.lastAddedOffset, Long.valueOf(getLowestUncommittedSequenceNumber()));
    }

    private CompletableFuture<Void> compactIfNeeded(DirectSegmentAccess directSegmentAccess, long j, TimeoutTimer timeoutTimer) {
        CompletableFuture<Void> completedFuture;
        if (this.compactor.isCompactionRequired(directSegmentAccess.getInfo())) {
            completedFuture = this.compactor.compact(directSegmentAccess, timeoutTimer);
        } else {
            log.debug("{}: No compaction required at this time.", this.traceObjectId);
            completedFuture = CompletableFuture.completedFuture(null);
        }
        return completedFuture.thenComposeAsync((Function<? super Void, ? extends CompletionStage<U>>) r11 -> {
            long calculateTruncationOffset = this.compactor.calculateTruncationOffset(directSegmentAccess.getInfo(), j);
            if (calculateTruncationOffset > 0) {
                log.debug("{}: Truncating segment at offset {}.", this.traceObjectId, Long.valueOf(calculateTruncationOffset));
                return directSegmentAccess.truncate(calculateTruncationOffset, timeoutTimer.getRemaining());
            }
            log.debug("{}: No segment truncation possible now.", this.traceObjectId);
            return CompletableFuture.completedFuture(null);
        }, (Executor) this.executor).exceptionally(th -> {
            log.error("{}: Compaction failed.", this.traceObjectId, th);
            return null;
        });
    }

    private CompletableFuture<TableWriterFlushResult> flushWithSingleRetry(DirectSegmentAccess directSegmentAccess, TimeoutTimer timeoutTimer) {
        return Futures.exceptionallyComposeExpecting(flushOnce(directSegmentAccess, timeoutTimer), this::canRetryFlushException, () -> {
            reconcileTableIndexOffset();
            return flushOnce(directSegmentAccess, timeoutTimer);
        });
    }

    private void flushComplete(long j) {
        this.aggregator.reset();
        this.aggregator.setLastIndexedOffset(j);
        this.connector.notifyIndexOffsetChanged(this.aggregator.getLastIndexedOffset());
        log.debug("{}: FlushComplete (State={}).", this.traceObjectId, this.aggregator);
    }

    private CompletableFuture<TableWriterFlushResult> flushOnce(DirectSegmentAccess directSegmentAccess, TimeoutTimer timeoutTimer) {
        KeyUpdateCollection readKeysFromSegment = readKeysFromSegment(directSegmentAccess, this.aggregator.getFirstOffset(), this.aggregator.getLastOffset(), timeoutTimer);
        log.debug("{}: Flush.ReadFromSegment {} UpdateKeys(s).", this.traceObjectId, Integer.valueOf(readKeysFromSegment.getUpdates().size()));
        return this.indexWriter.groupByBucket(directSegmentAccess, readKeysFromSegment.getUpdates(), timeoutTimer).thenComposeAsync(collection -> {
            return fetchExistingKeys((Collection<BucketUpdate.Builder>) collection, directSegmentAccess, timeoutTimer).thenComposeAsync(r15 -> {
                List list = (List) collection.stream().map((v0) -> {
                    return v0.build();
                }).collect(Collectors.toList());
                logBucketUpdates(list);
                return this.indexWriter.updateBuckets(directSegmentAccess, list, this.aggregator.getLastIndexedOffset(), readKeysFromSegment.getLastIndexedOffset(), readKeysFromSegment.getTotalUpdateCount(), timeoutTimer.getRemaining());
            }, (Executor) this.executor);
        }, (Executor) this.executor).thenApply((Function<? super U, ? extends U>) num -> {
            return new TableWriterFlushResult(readKeysFromSegment.getLastIndexedOffset(), readKeysFromSegment.getHighestCopiedOffset());
        });
    }

    private void reconcileTableIndexOffset() {
        try {
            long lastIndexedOffset = this.indexWriter.getLastIndexedOffset(this.connector.getMetadata());
            if (lastIndexedOffset < this.aggregator.getLastIndexedOffset()) {
                throw new DataCorruptionException(String.format("Cannot reconcile INDEX_OFFSET attribute (%s) for Segment '%s'. It is lower than our known value (%s).", Long.valueOf(lastIndexedOffset), Long.valueOf(this.connector.getMetadata().getId()), Long.valueOf(this.aggregator.getLastIndexedOffset())), new Object[0]);
            }
            if (!this.aggregator.setLastIndexedOffset(lastIndexedOffset)) {
                throw new DataCorruptionException(String.format("Cannot reconcile INDEX_OFFSET attribute (%s) for Segment '%s'. Most likely it does not conform to an append boundary.  Existing value: %s.", Long.valueOf(lastIndexedOffset), Long.valueOf(this.connector.getMetadata().getId()), Long.valueOf(this.aggregator.getLastIndexedOffset())), new Object[0]);
            }
            log.info("{}: ReconcileTableIndexOffset (State={}).", this.traceObjectId, this.aggregator);
        } catch (DataCorruptionException e) {
            throw e;
        }
    }

    private boolean canRetryFlushException(Throwable th) {
        if (!(th instanceof BadAttributeUpdateException)) {
            return false;
        }
        BadAttributeUpdateException badAttributeUpdateException = (BadAttributeUpdateException) th;
        return badAttributeUpdateException.getAttributeId() != null && badAttributeUpdateException.getAttributeId().equals(TableAttributes.INDEX_OFFSET);
    }

    private KeyUpdateCollection readKeysFromSegment(DirectSegmentAccess directSegmentAccess, long j, long j2, TimeoutTimer timeoutTimer) {
        try {
            KeyUpdateCollection keyUpdateCollection = new KeyUpdateCollection();
            InputStream readFromInMemorySegment = readFromInMemorySegment(directSegmentAccess, j, j2, timeoutTimer);
            Throwable th = null;
            long j3 = j;
            while (j3 < j2) {
                try {
                    try {
                        j3 += indexSingleKey(readFromInMemorySegment, j3, keyUpdateCollection);
                    } finally {
                    }
                } finally {
                }
            }
            if (readFromInMemorySegment != null) {
                if (0 != 0) {
                    try {
                        readFromInMemorySegment.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readFromInMemorySegment.close();
                }
            }
            return keyUpdateCollection;
        } catch (IOException e) {
            throw e;
        }
    }

    private int indexSingleKey(InputStream inputStream, long j, KeyUpdateCollection keyUpdateCollection) throws IOException {
        AsyncTableEntryReader.DeserializedEntry readEntryComponents = AsyncTableEntryReader.readEntryComponents(inputStream, j, this.connector.getSerializer());
        keyUpdateCollection.add(new BucketUpdate.KeyUpdate(new HashedArray(readEntryComponents.getKey()), j, readEntryComponents.getVersion(), readEntryComponents.getHeader().isDeletion()), readEntryComponents.getHeader().getTotalLength(), readEntryComponents.getHeader().getEntryVersion());
        return readEntryComponents.getHeader().getTotalLength();
    }

    private InputStream readFromInMemorySegment(DirectSegmentAccess directSegmentAccess, long j, long j2, TimeoutTimer timeoutTimer) {
        long j3 = j;
        long j4 = j2 - j;
        ArrayList arrayList = new ArrayList();
        while (j4 > 0) {
            int min = (int) Math.min(j4, 2147483647L);
            ReadResult read = directSegmentAccess.read(j3, min, timeoutTimer.getRemaining());
            Throwable th = null;
            try {
                try {
                    arrayList.addAll(read.readRemaining(min, timeoutTimer.getRemaining()));
                    if (!$assertionsDisabled && read.getConsumedLength() != min) {
                        throw new AssertionError("Expecting a full read (from memory).");
                    }
                    j4 -= read.getConsumedLength();
                    j3 += read.getConsumedLength();
                    if (read != null) {
                        if (0 != 0) {
                            try {
                                read.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            read.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (read != null) {
                    if (th != null) {
                        try {
                            read.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        read.close();
                    }
                }
                throw th3;
            }
        }
        return new SequenceInputStream(Iterators.asEnumeration(arrayList.iterator()));
    }

    private CompletableFuture<Void> fetchExistingKeys(Collection<BucketUpdate.Builder> collection, DirectSegmentAccess directSegmentAccess, TimeoutTimer timeoutTimer) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return Futures.loop(collection, builder -> {
            return fetchExistingKeys(builder, directSegmentAccess, timeoutTimer).thenApply(r2 -> {
                return true;
            });
        }, this.executor);
    }

    private CompletableFuture<Void> fetchExistingKeys(BucketUpdate.Builder builder, DirectSegmentAccess directSegmentAccess, TimeoutTimer timeoutTimer) {
        IndexWriter indexWriter = this.indexWriter;
        indexWriter.getClass();
        return TableBucketReader.key(directSegmentAccess, indexWriter::getBackpointerOffset, this.executor).findAll(builder.getBucket().getSegmentOffset(), (tableKey, l) -> {
            builder.withExistingKey(new BucketUpdate.KeyInfo(new HashedArray(tableKey.getKey()), l.longValue(), tableKey.getVersion()));
        }, timeoutTimer);
    }

    private void logBucketUpdates(Collection<BucketUpdate> collection) {
        if (log.isTraceEnabled()) {
            log.trace("{}: Updating {} TableBucket(s).", this.traceObjectId, Integer.valueOf(collection.size()));
            collection.forEach(bucketUpdate -> {
                log.trace("{}: TableBucket [Offset={}, {}]: ExistingKeys=[{}], Updates=[{}].", new Object[]{this.traceObjectId, Long.valueOf(bucketUpdate.getBucketOffset()), bucketUpdate.getBucket(), bucketUpdate.getExistingKeys().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("; ")), bucketUpdate.getKeyUpdates().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("; "))});
            });
        }
    }

    static {
        $assertionsDisabled = !WriterTableProcessor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(WriterTableProcessor.class);
    }
}
