package io.pravega.segmentstore.server.writer;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.AbstractTimer;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentInformation;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.SegmentOperation;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.WriterFlushResult;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.logs.operations.CachedStreamSegmentAppendOperation;
import io.pravega.segmentstore.server.logs.operations.DeleteSegmentOperation;
import io.pravega.segmentstore.server.logs.operations.MergeSegmentOperation;
import io.pravega.segmentstore.server.logs.operations.StorageOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentAppendOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentSealOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentTruncateOperation;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.SegmentRollingPolicy;
import io.pravega.segmentstore.storage.Storage;
import java.beans.ConstructorProperties;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator.class */
public class SegmentAggregator implements WriterSegmentProcessor, AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final UpdateableSegmentMetadata metadata;
    private final WriterConfig config;
    private final OperationQueue operations;
    private final AbstractTimer timer;
    private final Executor executor;
    private final String traceObjectId;
    private final Storage storage;
    private final AtomicReference<SegmentHandle> handle;
    private final WriterDataSource dataSource;
    private final AtomicInteger mergeTransactionCount;
    private final AtomicInteger truncateCount;
    private final AtomicBoolean hasSealPending;
    private final AtomicBoolean hasDeletePending;
    private final AtomicLong lastAddedOffset;
    private final AtomicReference<Duration> lastFlush;
    private final AtomicReference<AggregatorState> state;
    private final AtomicReference<ReconciliationState> reconciliationState;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$AggregatedAppendOperation.class */
    public static class AggregatedAppendOperation extends StorageOperation {
        private final AtomicLong streamSegmentOffset;
        private final AtomicInteger length;
        private final AtomicBoolean sealed;

        AggregatedAppendOperation(long j, long j2, long j3) {
            super(j);
            this.streamSegmentOffset = new AtomicLong(j2);
            setSequenceNumber(j3);
            this.length = new AtomicInteger();
            this.sealed = new AtomicBoolean();
        }

        void increaseLength(int i) {
            Preconditions.checkArgument(i > 0, "amount must be a positive integer.");
            this.length.addAndGet(i);
        }

        void seal() {
            this.sealed.set(true);
        }

        boolean isSealed() {
            return this.sealed.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation
        public long getStreamSegmentOffset() {
            return this.streamSegmentOffset.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation
        public long getLength() {
            return this.length.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation, io.pravega.segmentstore.server.logs.operations.Operation
        public String toString() {
            return String.format("AggregatedAppend: SegmentId = %s, Offsets = [%s-%s), SeqNo = %s", Long.valueOf(getStreamSegmentId()), Long.valueOf(getStreamSegmentOffset()), Long.valueOf(getStreamSegmentOffset() + getLength()), Long.valueOf(getSequenceNumber()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$FlushArgs.class */
    public static class FlushArgs {
        private final InputStream stream;
        private final int length;

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"stream", "length"})
        public FlushArgs(InputStream inputStream, int i) {
            this.stream = inputStream;
            this.length = i;
        }

        @SuppressFBWarnings(justification = "generated code")
        public InputStream getStream() {
            return this.stream;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int getLength() {
            return this.length;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FlushArgs)) {
                return false;
            }
            FlushArgs flushArgs = (FlushArgs) obj;
            if (!flushArgs.canEqual(this)) {
                return false;
            }
            InputStream stream = getStream();
            InputStream stream2 = flushArgs.getStream();
            if (stream == null) {
                if (stream2 != null) {
                    return false;
                }
            } else if (!stream.equals(stream2)) {
                return false;
            }
            return getLength() == flushArgs.getLength();
        }

        @SuppressFBWarnings(justification = "generated code")
        protected boolean canEqual(Object obj) {
            return obj instanceof FlushArgs;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            InputStream stream = getStream();
            return (((1 * 59) + (stream == null ? 43 : stream.hashCode())) * 59) + getLength();
        }

        @SuppressFBWarnings(justification = "generated code")
        public String toString() {
            return "SegmentAggregator.FlushArgs(stream=" + getStream() + ", length=" + getLength() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$OperationQueue.class */
    public static class OperationQueue {

        @GuardedBy("this")
        private final ArrayDeque<StorageOperation> queue;

        private OperationQueue() {
            this.queue = new ArrayDeque<>();
        }

        synchronized boolean add(StorageOperation storageOperation) {
            return this.queue.add(storageOperation);
        }

        synchronized StorageOperation getLast() {
            return this.queue.peekLast();
        }

        synchronized StorageOperation getFirst() {
            return this.queue.peekFirst();
        }

        synchronized StorageOperation removeFirst() {
            return this.queue.pollFirst();
        }

        synchronized int size() {
            return this.queue.size();
        }

        synchronized void clear() {
            this.queue.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$ReconciliationState.class */
    public static class ReconciliationState {
        private final SegmentProperties storageInfo;
        private final long initialStorageLength;

        ReconciliationState(SegmentMetadata segmentMetadata, SegmentProperties segmentProperties) {
            Preconditions.checkNotNull(segmentProperties, "storageInfo");
            this.storageInfo = segmentProperties;
            this.initialStorageLength = segmentMetadata.getStorageLength();
        }

        public String toString() {
            return String.format("Metadata.StorageLength = %d, Storage.Length = %d", Long.valueOf(this.initialStorageLength), Long.valueOf(this.storageInfo.getLength()));
        }

        @SuppressFBWarnings(justification = "generated code")
        public SegmentProperties getStorageInfo() {
            return this.storageInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getInitialStorageLength() {
            return this.initialStorageLength;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentAggregator(UpdateableSegmentMetadata updateableSegmentMetadata, WriterDataSource writerDataSource, Storage storage, WriterConfig writerConfig, AbstractTimer abstractTimer, Executor executor) {
        this.metadata = (UpdateableSegmentMetadata) Preconditions.checkNotNull(updateableSegmentMetadata, "segmentMetadata");
        Preconditions.checkArgument(this.metadata.getContainerId() == writerDataSource.getId(), "SegmentMetadata.ContainerId is different from WriterDataSource.Id");
        this.traceObjectId = String.format("StorageWriter[%d-%d]", Integer.valueOf(this.metadata.getContainerId()), Long.valueOf(this.metadata.getId()));
        this.config = (WriterConfig) Preconditions.checkNotNull(writerConfig, "config");
        this.storage = (Storage) Preconditions.checkNotNull(storage, "storage");
        this.dataSource = (WriterDataSource) Preconditions.checkNotNull(writerDataSource, "dataSource");
        this.timer = (AbstractTimer) Preconditions.checkNotNull(abstractTimer, "timer");
        this.executor = (Executor) Preconditions.checkNotNull(executor, "executor");
        this.lastFlush = new AtomicReference<>(abstractTimer.getElapsed());
        this.lastAddedOffset = new AtomicLong(-1L);
        this.mergeTransactionCount = new AtomicInteger();
        this.truncateCount = new AtomicInteger();
        this.hasSealPending = new AtomicBoolean();
        this.hasDeletePending = new AtomicBoolean();
        this.operations = new OperationQueue();
        this.state = new AtomicReference<>(AggregatorState.NotInitialized);
        this.reconciliationState = new AtomicReference<>();
        this.handle = new AtomicReference<>();
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor, java.lang.AutoCloseable
    public void close() {
        if (isClosed()) {
            return;
        }
        setState(AggregatorState.Closed);
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public long getLowestUncommittedSequenceNumber() {
        StorageOperation first = this.operations.getFirst();
        if (first == null) {
            return Long.MIN_VALUE;
        }
        return first.getSequenceNumber();
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean isClosed() {
        return this.state.get() == AggregatorState.Closed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentMetadata getMetadata() {
        return this.metadata;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Duration getElapsedSinceLastFlush() {
        return this.timer.getElapsed().minus(this.lastFlush.get());
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean mustFlush() {
        if (this.metadata.isDeletedInStorage()) {
            return false;
        }
        return exceedsThresholds() || this.hasDeletePending.get() || this.hasSealPending.get() || this.mergeTransactionCount.get() > 0 || this.truncateCount.get() > 0 || (this.operations.size() > 0 && isReconciling());
    }

    private boolean exceedsThresholds() {
        long length = this.operations.size() > 0 && isAppendOperation(this.operations.getFirst()) ? this.operations.getFirst().getLength() : 0L;
        return length >= ((long) this.config.getFlushThresholdBytes()) || (length > 0 && getElapsedSinceLastFlush().compareTo(this.config.getFlushThresholdTime()) >= 0);
    }

    private boolean isReconciling() {
        AggregatorState aggregatorState = this.state.get();
        return aggregatorState == AggregatorState.ReconciliationNeeded || aggregatorState == AggregatorState.Reconciling;
    }

    public String toString() {
        return String.format("[%d: %s] Count = %d, LastOffset = %s, LUSN = %d, LastFlush = %ds", Long.valueOf(this.metadata.getId()), this.metadata.getName(), Integer.valueOf(this.operations.size()), this.lastAddedOffset, Long.valueOf(getLowestUncommittedSequenceNumber()), Long.valueOf(getElapsedSinceLastFlush().toMillis() / 1000));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> initialize(Duration duration) {
        Exceptions.checkNotClosed(isClosed(), this);
        Preconditions.checkState(this.state.get() == AggregatorState.NotInitialized, "SegmentAggregator has already been initialized.");
        if (!$assertionsDisabled && this.handle.get() != null) {
            throw new AssertionError("non-null handle but state == " + this.state.get());
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "initialize", new Object[0]);
        return openWrite(this.metadata.getName(), this.handle, duration).thenAcceptAsync(segmentProperties -> {
            if (this.metadata.getStorageLength() != segmentProperties.getLength()) {
                if (this.metadata.getStorageLength() >= 0) {
                    log.info("{}: SegmentMetadata has a StorageLength ({}) that is different than the actual one ({}) - updating metadata.", new Object[]{this.traceObjectId, Long.valueOf(this.metadata.getStorageLength()), Long.valueOf(segmentProperties.getLength())});
                }
                this.metadata.setStorageLength(segmentProperties.getLength());
            }
            if (segmentProperties.isSealed()) {
                if (!this.metadata.isSealed()) {
                    throw new CompletionException((Throwable) new DataCorruptionException(String.format("Segment '%s' is sealed in Storage but not in the metadata.", this.metadata.getName()), new Object[0]));
                }
                if (!this.metadata.isSealedInStorage()) {
                    this.metadata.markSealedInStorage();
                    log.info("{}: Segment is sealed in Storage but metadata does not reflect that - updating metadata.", this.traceObjectId);
                }
            }
            log.info("{}: Initialized. StorageLength = {}, Sealed = {}.", new Object[]{this.traceObjectId, Long.valueOf(segmentProperties.getLength()), Boolean.valueOf(segmentProperties.isSealed())});
            LoggerHelpers.traceLeave(log, this.traceObjectId, "initialize", traceEnterWithContext, new Object[0]);
            setState(AggregatorState.Writing);
        }, this.executor).exceptionally(th -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if (!(unwrap instanceof StreamSegmentNotExistsException)) {
                throw new CompletionException(unwrap);
            }
            if (this.metadata.getStorageLength() != 0 || this.metadata.isDeletedInStorage()) {
                updateMetadataPostDeletion(this.metadata);
                log.info("{}: Segment '{}' does not exist in Storage. Ignoring all further operations on it.", this.traceObjectId, this.metadata.getName());
            } else {
                this.handle.set(null);
                log.info("{}: Initialized. Segment does not exist in Storage but Metadata indicates it should be empty.", this.traceObjectId);
                if (this.metadata.isSealed() && this.metadata.getLength() == 0) {
                    this.metadata.markSealedInStorage();
                    log.info("{}: Segment does not exist in Storage, but Metadata indicates it is empty and sealed - marking as sealed in storage.", this.traceObjectId);
                }
            }
            setState(AggregatorState.Writing);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "initialize", traceEnterWithContext, new Object[0]);
            return null;
        });
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public void add(SegmentOperation segmentOperation) throws DataCorruptionException {
        ensureInitializedAndNotClosed();
        if (segmentOperation instanceof StorageOperation) {
            StorageOperation storageOperation = (StorageOperation) segmentOperation;
            checkValidOperation(storageOperation);
            if (isDeleteOperation(storageOperation)) {
                addDeleteOperation((DeleteSegmentOperation) storageOperation);
                log.debug("{}: Add {}.", this.traceObjectId, storageOperation);
            } else {
                if (this.metadata.isDeleted()) {
                    return;
                }
                addStorageOperation(storageOperation);
                log.debug("{}: Add {}; OpCount={}, MergeCount={}, Seal={}.", new Object[]{this.traceObjectId, storageOperation, Integer.valueOf(this.operations.size()), this.mergeTransactionCount, this.hasSealPending});
            }
        }
    }

    private void addDeleteOperation(DeleteSegmentOperation deleteSegmentOperation) {
        this.operations.add(deleteSegmentOperation);
        this.hasDeletePending.set(true);
    }

    private void addStorageOperation(StorageOperation storageOperation) throws DataCorruptionException {
        checkValidStorageOperation(storageOperation);
        long lastStreamSegmentOffset = storageOperation.getLastStreamSegmentOffset();
        boolean isTruncateOperation = isTruncateOperation(storageOperation);
        if (lastStreamSegmentOffset > this.metadata.getStorageLength() || isTruncateOperation || (!this.metadata.isSealedInStorage() && (storageOperation instanceof StreamSegmentSealOperation)) || ((storageOperation instanceof MergeSegmentOperation) && storageOperation.getLength() == 0 && lastStreamSegmentOffset == this.metadata.getStorageLength())) {
            processNewOperation(storageOperation);
        } else {
            acknowledgeAlreadyProcessedOperation(storageOperation);
        }
        if (isTruncateOperation) {
            return;
        }
        this.lastAddedOffset.set(lastStreamSegmentOffset);
    }

    private void processNewOperation(StorageOperation storageOperation) {
        if (storageOperation instanceof MergeSegmentOperation) {
            this.operations.add(storageOperation);
            this.mergeTransactionCount.incrementAndGet();
            return;
        }
        if (storageOperation instanceof StreamSegmentSealOperation) {
            this.operations.add(storageOperation);
            this.hasSealPending.set(true);
        } else if (storageOperation instanceof StreamSegmentTruncateOperation) {
            this.operations.add(storageOperation);
            this.truncateCount.incrementAndGet();
        } else if (storageOperation instanceof CachedStreamSegmentAppendOperation) {
            aggregateAppendOperation((CachedStreamSegmentAppendOperation) storageOperation, getOrCreateAggregatedAppend(storageOperation.getStreamSegmentOffset(), storageOperation.getSequenceNumber()));
        }
    }

    private void acknowledgeAlreadyProcessedOperation(SegmentOperation segmentOperation) throws DataCorruptionException {
        if (segmentOperation instanceof MergeSegmentOperation) {
            MergeSegmentOperation mergeSegmentOperation = (MergeSegmentOperation) segmentOperation;
            try {
                updateMetadataForTransactionPostMerger(this.dataSource.getStreamSegmentMetadata(mergeSegmentOperation.getSourceSegmentId()), mergeSegmentOperation.getStreamSegmentId());
            } catch (Throwable th) {
                throw new DataCorruptionException(String.format("Unable to acknowledge already processed operation '%s'.", segmentOperation), th, new Object[0]);
            }
        }
    }

    private void aggregateAppendOperation(CachedStreamSegmentAppendOperation cachedStreamSegmentAppendOperation, AggregatedAppendOperation aggregatedAppendOperation) {
        long length = cachedStreamSegmentAppendOperation.getLength();
        if (cachedStreamSegmentAppendOperation.getStreamSegmentOffset() < aggregatedAppendOperation.getLastStreamSegmentOffset()) {
            long lastStreamSegmentOffset = aggregatedAppendOperation.getLastStreamSegmentOffset() - cachedStreamSegmentAppendOperation.getStreamSegmentOffset();
            length -= lastStreamSegmentOffset;
            log.debug("{}: Skipping {} bytes from the beginning of '{}' since it has already been partially written to Storage.", new Object[]{this.traceObjectId, Long.valueOf(lastStreamSegmentOffset), cachedStreamSegmentAppendOperation});
        }
        while (length > 0) {
            int min = (int) Math.min(this.config.getMaxFlushSizeBytes() - aggregatedAppendOperation.getLength(), length);
            aggregatedAppendOperation.increaseLength(min);
            length -= min;
            if (length > 0) {
                aggregatedAppendOperation = new AggregatedAppendOperation(this.metadata.getId(), aggregatedAppendOperation.getLastStreamSegmentOffset(), cachedStreamSegmentAppendOperation.getSequenceNumber());
                this.operations.add(aggregatedAppendOperation);
            }
        }
    }

    private AggregatedAppendOperation getOrCreateAggregatedAppend(long j, long j2) {
        AggregatedAppendOperation aggregatedAppendOperation = null;
        if (this.operations.size() > 0) {
            StorageOperation last = this.operations.getLast();
            if (last.getLength() < this.config.getMaxFlushSizeBytes() && isAppendOperation(last)) {
                aggregatedAppendOperation = (AggregatedAppendOperation) last;
                if (aggregatedAppendOperation.isSealed()) {
                    aggregatedAppendOperation = null;
                }
            }
        }
        if (aggregatedAppendOperation == null) {
            aggregatedAppendOperation = new AggregatedAppendOperation(this.metadata.getId(), Math.max(j, this.metadata.getStorageLength()), j2);
            this.operations.add(aggregatedAppendOperation);
        }
        return aggregatedAppendOperation;
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public CompletableFuture<WriterFlushResult> flush(Duration duration) {
        CompletableFuture<WriterFlushResult> failedFuture;
        ensureInitializedAndNotClosed();
        if (this.metadata.isDeletedInStorage()) {
            return CompletableFuture.completedFuture(new WriterFlushResult());
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flush", new Object[0]);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        try {
            switch (this.state.get()) {
                case Writing:
                    failedFuture = flushNormally(timeoutTimer);
                    break;
                case ReconciliationNeeded:
                    failedFuture = beginReconciliation(timeoutTimer).thenComposeAsync(r5 -> {
                        return reconcile(timeoutTimer);
                    }, this.executor);
                    break;
                case Reconciling:
                    failedFuture = reconcile(timeoutTimer);
                    break;
                default:
                    failedFuture = Futures.failedFuture(new IllegalStateException(String.format("Unexpected state for SegmentAggregator (%s) for segment '%s'.", this.state, this.metadata.getName())));
                    break;
            }
        } catch (Exception e) {
            failedFuture = Futures.failedFuture(e);
        }
        return failedFuture.thenApply(writerFlushResult -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flush", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    private CompletableFuture<WriterFlushResult> flushNormally(TimeoutTimer timeoutTimer) {
        if (!$assertionsDisabled && this.state.get() != AggregatorState.Writing) {
            throw new AssertionError("flushNormally cannot be called if state == " + this.state);
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushNormally", new Object[]{Integer.valueOf(this.operations.size())});
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        atomicBoolean.getClass();
        return Futures.loop(atomicBoolean::get, () -> {
            return flushOnce(timeoutTimer);
        }, writerFlushResult2 -> {
            atomicBoolean.set(writerFlushResult2.getFlushedBytes() + writerFlushResult2.getMergedBytes() > 0);
            writerFlushResult.withFlushResult(writerFlushResult2);
        }, this.executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushNormally", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v6, types: [java.util.concurrent.CompletableFuture<io.pravega.segmentstore.server.WriterFlushResult>, java.util.function.Function] */
    /* JADX WARN: Type inference failed for: r1v9, types: [java.util.concurrent.CompletableFuture<io.pravega.segmentstore.server.WriterFlushResult>, java.util.function.Function] */
    private CompletableFuture<WriterFlushResult> flushOnce(TimeoutTimer timeoutTimer) {
        CompletableFuture flushFully;
        boolean z = this.hasDeletePending.get();
        boolean z2 = this.mergeTransactionCount.get() > 0;
        boolean z3 = this.hasSealPending.get();
        boolean z4 = this.truncateCount.get() > 0;
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushOnce", new Object[]{Integer.valueOf(this.operations.size()), this.mergeTransactionCount, Boolean.valueOf(z3), Boolean.valueOf(z4), Boolean.valueOf(z)});
        if (z) {
            flushFully = deleteSegment(timeoutTimer);
        } else if (z3 || z2 || z4) {
            flushFully = flushFully(timeoutTimer);
            if (z2) {
                flushFully = flushFully.thenComposeAsync((Function) writerFlushResult -> {
                    return mergeIfNecessary(writerFlushResult, timeoutTimer);
                }, this.executor);
            }
            if (z3) {
                flushFully = flushFully.thenComposeAsync(writerFlushResult2 -> {
                    return sealIfNecessary(writerFlushResult2, timeoutTimer);
                }, this.executor);
            }
        } else {
            flushFully = flushExcess(timeoutTimer);
        }
        if (log.isTraceEnabled()) {
            flushFully = flushFully.thenApply((Function) writerFlushResult3 -> {
                LoggerHelpers.traceLeave(log, this.traceObjectId, "flushOnce", traceEnterWithContext, new Object[]{writerFlushResult3});
                return writerFlushResult3;
            });
        }
        return flushFully;
    }

    private CompletableFuture<WriterFlushResult> flushFully(TimeoutTimer timeoutTimer) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushFully", new Object[0]);
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        Supplier supplier = this::canContinueFlushingFully;
        Supplier supplier2 = () -> {
            return flushPendingAppends(timeoutTimer.getRemaining()).thenCompose(writerFlushResult2 -> {
                return flushPendingTruncate(writerFlushResult2, timeoutTimer.getRemaining());
            });
        };
        writerFlushResult.getClass();
        return Futures.loop(supplier, supplier2, writerFlushResult::withFlushResult, this.executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushFully", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    private boolean canContinueFlushingFully() {
        StorageOperation first = this.operations.getFirst();
        return isAppendOperation(first) || isTruncateOperation(first);
    }

    private CompletableFuture<WriterFlushResult> flushExcess(TimeoutTimer timeoutTimer) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushExcess", new Object[0]);
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        Supplier supplier = this::canContinueFlushingExcess;
        Supplier supplier2 = () -> {
            return flushPendingAppends(timeoutTimer.getRemaining()).thenCompose(writerFlushResult2 -> {
                return flushPendingTruncate(writerFlushResult2, timeoutTimer.getRemaining());
            });
        };
        writerFlushResult.getClass();
        return Futures.loop(supplier, supplier2, writerFlushResult::withFlushResult, this.executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushExcess", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    private boolean canContinueFlushingExcess() {
        return (!this.metadata.isDeleted() && exceedsThresholds()) || isTruncateOperation(this.operations.getFirst());
    }

    private CompletableFuture<WriterFlushResult> flushPendingTruncate(WriterFlushResult writerFlushResult, Duration duration) {
        CompletableFuture truncate;
        StorageOperation first = this.operations.getFirst();
        if (!isTruncateOperation(first) || !this.storage.supportsTruncation()) {
            return CompletableFuture.completedFuture(writerFlushResult);
        }
        if (this.handle.get() != null) {
            truncate = this.storage.truncate(this.handle.get(), Math.min(this.metadata.getStorageLength(), first.getStreamSegmentOffset()), duration);
        } else {
            if (!$assertionsDisabled && this.metadata.getStorageLength() != 0) {
                throw new AssertionError("handle is null but Metadata.getStorageLength is non-zero");
            }
            truncate = CompletableFuture.completedFuture(null);
        }
        return truncate.thenApplyAsync(r4 -> {
            updateStatePostTruncate();
            return writerFlushResult;
        }, this.executor);
    }

    private CompletableFuture<WriterFlushResult> flushPendingAppends(Duration duration) {
        try {
            FlushArgs flushArgs = getFlushArgs();
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushPendingAppends", new Object[0]);
            TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
            return (flushArgs.getLength() == 0 ? CompletableFuture.completedFuture(null) : createSegmentIfNecessary(() -> {
                return this.storage.write(this.handle.get(), this.metadata.getStorageLength(), flushArgs.getStream(), flushArgs.getLength(), timeoutTimer.getRemaining());
            }, timeoutTimer.getRemaining())).thenApplyAsync(r14 -> {
                WriterFlushResult updateStatePostFlush = updateStatePostFlush(flushArgs);
                LoggerHelpers.traceLeave(log, this.traceObjectId, "flushPendingAppends", traceEnterWithContext, new Object[]{updateStatePostFlush});
                return updateStatePostFlush;
            }, this.executor).exceptionally((Function<Throwable, ? extends U>) th -> {
                if (Exceptions.unwrap(th) instanceof BadOffsetException) {
                    setState(AggregatorState.ReconciliationNeeded);
                }
                throw new CompletionException(th);
            });
        } catch (DataCorruptionException e) {
            return Futures.failedFuture(e);
        }
    }

    private FlushArgs getFlushArgs() throws DataCorruptionException {
        StorageOperation first = this.operations.getFirst();
        if (!(first instanceof AggregatedAppendOperation)) {
            return new FlushArgs(null, 0);
        }
        AggregatedAppendOperation aggregatedAppendOperation = (AggregatedAppendOperation) first;
        int length = (int) aggregatedAppendOperation.getLength();
        InputStream inputStream = null;
        if (length > 0) {
            inputStream = this.dataSource.getAppendData(aggregatedAppendOperation.getStreamSegmentId(), aggregatedAppendOperation.getStreamSegmentOffset(), length);
            if (inputStream == null) {
                if (this.metadata.isDeleted()) {
                    return new FlushArgs(null, 0);
                }
                throw new DataCorruptionException(String.format("Unable to retrieve CacheContents for '%s'.", aggregatedAppendOperation), new Object[0]);
            }
        }
        aggregatedAppendOperation.seal();
        return new FlushArgs(inputStream, length);
    }

    private CompletableFuture<WriterFlushResult> mergeIfNecessary(WriterFlushResult writerFlushResult, TimeoutTimer timeoutTimer) {
        ensureInitializedAndNotClosed();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "mergeIfNecessary", new Object[0]);
        StorageOperation first = this.operations.getFirst();
        if (first == null || !(first instanceof MergeSegmentOperation)) {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeIfNecessary", traceEnterWithContext, new Object[]{writerFlushResult});
            return CompletableFuture.completedFuture(writerFlushResult);
        }
        MergeSegmentOperation mergeSegmentOperation = (MergeSegmentOperation) first;
        return mergeWith(this.dataSource.getStreamSegmentMetadata(mergeSegmentOperation.getSourceSegmentId()), mergeSegmentOperation, timeoutTimer).thenApply(writerFlushResult2 -> {
            writerFlushResult.withFlushResult(writerFlushResult2);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeIfNecessary", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    private CompletableFuture<WriterFlushResult> mergeWith(UpdateableSegmentMetadata updateableSegmentMetadata, MergeSegmentOperation mergeSegmentOperation, TimeoutTimer timeoutTimer) {
        CompletableFuture<SegmentProperties> mergeInStorage;
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "mergeWith", new Object[]{Long.valueOf(updateableSegmentMetadata.getId()), updateableSegmentMetadata.getName(), Boolean.valueOf(updateableSegmentMetadata.isSealedInStorage())});
        boolean z = updateableSegmentMetadata.getLength() == 0;
        if (updateableSegmentMetadata.isDeleted() && !z) {
            setState(AggregatorState.ReconciliationNeeded);
            return Futures.failedFuture(new StreamSegmentNotExistsException(updateableSegmentMetadata.getName()));
        }
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        if (z) {
            log.warn("{}: Not applying '{}' because source segment is missing or empty.", this.traceObjectId, mergeSegmentOperation);
            mergeInStorage = CompletableFuture.completedFuture(this.metadata);
        } else {
            if (!updateableSegmentMetadata.isSealedInStorage() || updateableSegmentMetadata.getLength() > updateableSegmentMetadata.getStorageLength()) {
                LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeWith", traceEnterWithContext, new Object[]{writerFlushResult});
                return CompletableFuture.completedFuture(writerFlushResult);
            }
            mergeInStorage = mergeInStorage(updateableSegmentMetadata, mergeSegmentOperation, timeoutTimer);
        }
        return mergeInStorage.thenAcceptAsync(segmentProperties -> {
            mergeCompleted(segmentProperties, updateableSegmentMetadata, mergeSegmentOperation);
        }, this.executor).thenComposeAsync(r7 -> {
            return this.dataSource.deleteAllAttributes(updateableSegmentMetadata, timeoutTimer.getRemaining());
        }, this.executor).thenApply((Function<? super U, ? extends U>) r15 -> {
            this.lastFlush.set(this.timer.getElapsed());
            writerFlushResult.withMergedBytes(mergeSegmentOperation.getLength());
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeWith", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        }).exceptionally(th -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if ((unwrap instanceof BadOffsetException) || (unwrap instanceof StreamSegmentNotExistsException)) {
                setState(AggregatorState.ReconciliationNeeded);
            }
            throw new CompletionException(th);
        });
    }

    private CompletableFuture<SegmentProperties> mergeInStorage(SegmentMetadata segmentMetadata, MergeSegmentOperation mergeSegmentOperation, TimeoutTimer timeoutTimer) {
        return this.storage.getStreamSegmentInfo(segmentMetadata.getName(), timeoutTimer.getRemaining()).thenAcceptAsync(segmentProperties -> {
            if (segmentProperties.getLength() != segmentMetadata.getStorageLength()) {
                throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' cannot be merged into parent '%s' because its metadata disagrees with the Storage. Metadata.StorageLength=%d, Storage.StorageLength=%d", segmentMetadata.getName(), this.metadata.getName(), Long.valueOf(segmentMetadata.getStorageLength()), Long.valueOf(segmentProperties.getLength())), new Object[0]));
            }
            if (segmentProperties.getLength() != mergeSegmentOperation.getLength()) {
                throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' cannot be merged into parent '%s' because the declared length in the operation disagrees with the Storage. Operation.Length=%d, Storage.StorageLength=%d", segmentMetadata.getName(), this.metadata.getName(), Long.valueOf(mergeSegmentOperation.getLength()), Long.valueOf(segmentProperties.getLength())), new Object[0]));
            }
        }, this.executor).thenComposeAsync(r10 -> {
            return createSegmentIfNecessary(() -> {
                return this.storage.concat(this.handle.get(), mergeSegmentOperation.getStreamSegmentOffset(), segmentMetadata.getName(), timeoutTimer.getRemaining());
            }, timeoutTimer.getRemaining());
        }, this.executor).exceptionally((Function<Throwable, ? extends U>) th -> {
            StreamSegmentNotExistsException unwrap = Exceptions.unwrap(th);
            if (segmentMetadata.getLength() != 0 || !(unwrap instanceof StreamSegmentNotExistsException) || !unwrap.getStreamSegmentName().equals(segmentMetadata.getName())) {
                throw new CompletionException((Throwable) unwrap);
            }
            log.warn("{}: Not applying '{}' because source segment is missing (storage) and had no data.", this.traceObjectId, mergeSegmentOperation);
            return null;
        }).thenComposeAsync(r6 -> {
            return this.storage.getStreamSegmentInfo(this.metadata.getName(), timeoutTimer.getRemaining());
        }, this.executor);
    }

    private void mergeCompleted(SegmentProperties segmentProperties, UpdateableSegmentMetadata updateableSegmentMetadata, MergeSegmentOperation mergeSegmentOperation) {
        StorageOperation removeFirst = this.operations.removeFirst();
        if (!$assertionsDisabled && (removeFirst == null || !(removeFirst instanceof MergeSegmentOperation))) {
            throw new AssertionError("First outstanding operation was not a MergeSegmentOperation");
        }
        MergeSegmentOperation mergeSegmentOperation2 = (MergeSegmentOperation) removeFirst;
        if (!$assertionsDisabled && mergeSegmentOperation2.getSourceSegmentId() != updateableSegmentMetadata.getId()) {
            throw new AssertionError("First outstanding operation was a MergeSegmentOperation for the wrong Transaction id.");
        }
        int decrementAndGet = this.mergeTransactionCount.decrementAndGet();
        if (!$assertionsDisabled && decrementAndGet < 0) {
            throw new AssertionError("Negative value for mergeTransactionCount");
        }
        long storageLength = this.metadata.getStorageLength() + mergeSegmentOperation.getLength();
        if (segmentProperties.getLength() != storageLength) {
            throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' was merged into parent '%s' but the parent segment has an unexpected StorageLength after the merger. Previous=%d, MergeLength=%d, Expected=%d, Actual=%d", updateableSegmentMetadata.getName(), this.metadata.getName(), Long.valueOf(segmentProperties.getLength()), Long.valueOf(mergeSegmentOperation.getLength()), Long.valueOf(storageLength), Long.valueOf(segmentProperties.getLength())), new Object[0]));
        }
        updateMetadata(segmentProperties);
        updateMetadataForTransactionPostMerger(updateableSegmentMetadata, mergeSegmentOperation2.getStreamSegmentId());
    }

    private CompletableFuture<WriterFlushResult> sealIfNecessary(WriterFlushResult writerFlushResult, TimeoutTimer timeoutTimer) {
        CompletableFuture seal;
        if (!this.hasSealPending.get() || !(this.operations.getFirst() instanceof StreamSegmentSealOperation)) {
            return CompletableFuture.completedFuture(writerFlushResult);
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "sealIfNecessary", new Object[0]);
        if (this.handle.get() != null) {
            seal = this.storage.seal(this.handle.get(), timeoutTimer.getRemaining());
        } else {
            if (!$assertionsDisabled && this.metadata.getStorageLength() != 0) {
                throw new AssertionError("handle is null but Metadata.StorageLength is non-zero");
            }
            seal = CompletableFuture.completedFuture(null);
        }
        return seal.handleAsync((r14, th) -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap == null || (unwrap instanceof StreamSegmentSealedException)) {
                updateStatePostSeal();
                LoggerHelpers.traceLeave(log, this.traceObjectId, "sealIfNecessary", traceEnterWithContext, new Object[]{writerFlushResult});
                return writerFlushResult;
            }
            if (unwrap instanceof StreamSegmentNotExistsException) {
                setState(AggregatorState.ReconciliationNeeded);
            }
            throw new CompletionException(unwrap);
        }, this.executor);
    }

    private CompletableFuture<WriterFlushResult> deleteSegment(TimeoutTimer timeoutTimer) {
        return (this.handle.get() == null ? this.dataSource.deleteAllAttributes(this.metadata, timeoutTimer.getRemaining()) : deleteSegmentAndAttributes(this.handle.get(), this.metadata, timeoutTimer)).thenComposeAsync(r5 -> {
            return deleteUnmergedSourceSegments(timeoutTimer);
        }, this.executor).thenApplyAsync((Function<? super U, ? extends U>) r4 -> {
            updateMetadataPostDeletion(this.metadata);
            this.hasSealPending.set(false);
            this.hasDeletePending.set(false);
            this.truncateCount.set(0);
            this.mergeTransactionCount.set(0);
            this.operations.clear();
            return new WriterFlushResult();
        }, this.executor);
    }

    private CompletableFuture<Void> deleteSegmentAndAttributes(SegmentHandle segmentHandle, SegmentMetadata segmentMetadata, TimeoutTimer timeoutTimer) {
        if ($assertionsDisabled || segmentHandle.getSegmentName().equals(segmentMetadata.getName())) {
            return CompletableFuture.allOf(Futures.exceptionallyExpecting(this.storage.delete(segmentHandle, timeoutTimer.getRemaining()), th -> {
                return th instanceof StreamSegmentNotExistsException;
            }, (Object) null), this.dataSource.deleteAllAttributes(segmentMetadata, timeoutTimer.getRemaining()));
        }
        throw new AssertionError();
    }

    /* JADX WARN: Code restructure failed: missing block: B:28:0x00ac, code lost:
    
        return io.pravega.common.concurrent.Futures.allOf(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.concurrent.CompletableFuture<java.lang.Void> deleteUnmergedSourceSegments(io.pravega.common.TimeoutTimer r7) {
        /*
            r6 = this;
            r0 = r6
            java.util.concurrent.atomic.AtomicInteger r0 = r0.mergeTransactionCount
            int r0 = r0.get()
            if (r0 != 0) goto Lf
            r0 = 0
            java.util.concurrent.CompletableFuture r0 = java.util.concurrent.CompletableFuture.completedFuture(r0)
            return r0
        Lf:
            java.util.ArrayList r0 = new java.util.ArrayList
            r1 = r0
            r1.<init>()
            r8 = r0
        L17:
            r0 = r6
            io.pravega.segmentstore.server.writer.SegmentAggregator$OperationQueue r0 = r0.operations
            io.pravega.segmentstore.server.logs.operations.StorageOperation r0 = r0.getFirst()
            r1 = r0
            r9 = r1
            if (r0 == 0) goto La8
            r0 = r6
            r1 = r9
            boolean r0 = r0.isDeleteOperation(r1)
            if (r0 != 0) goto La8
            r0 = r9
            boolean r0 = r0 instanceof io.pravega.segmentstore.server.logs.operations.MergeSegmentOperation
            if (r0 == 0) goto L17
            r0 = r6
            io.pravega.segmentstore.server.writer.WriterDataSource r0 = r0.dataSource
            r1 = r9
            io.pravega.segmentstore.server.logs.operations.MergeSegmentOperation r1 = (io.pravega.segmentstore.server.logs.operations.MergeSegmentOperation) r1
            long r1 = r1.getSourceSegmentId()
            io.pravega.segmentstore.server.UpdateableSegmentMetadata r0 = r0.getStreamSegmentMetadata(r1)
            r10 = r0
            r0 = r8
            r1 = r6
            io.pravega.segmentstore.storage.Storage r1 = r1.storage
            r2 = r10
            java.lang.String r2 = r2.getName()
            java.util.concurrent.CompletableFuture r1 = r1.openWrite(r2)
            r2 = r6
            r3 = r10
            r4 = r7
            java.util.concurrent.CompletableFuture<java.lang.Void> r2 = (v3) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return r2.lambda$deleteUnmergedSourceSegments$34(r3, r4, v3);
            }
            java.util.concurrent.CompletableFuture r1 = r1.thenCompose(r2)
            r2 = r6
            r3 = r10
            java.util.concurrent.CompletableFuture<java.lang.Void> r2 = (v2) -> { // java.util.function.Consumer.accept(java.lang.Object):void
                r2.lambda$deleteUnmergedSourceSegments$35(r3, v2);
            }
            r3 = r6
            java.util.concurrent.Executor r3 = r3.executor
            java.util.concurrent.CompletableFuture r1 = r1.thenAcceptAsync(r2, r3)
            java.util.concurrent.CompletableFuture<java.lang.Void> r2 = (v0) -> { // java.util.function.Predicate.test(java.lang.Object):boolean
                return lambda$deleteUnmergedSourceSegments$36(v0);
            }
            r3 = 0
            java.util.concurrent.CompletableFuture r1 = io.pravega.common.concurrent.Futures.exceptionallyExpecting(r1, r2, r3)
            boolean r0 = r0.add(r1)
            r0 = r6
            io.pravega.segmentstore.server.writer.SegmentAggregator$OperationQueue r0 = r0.operations
            io.pravega.segmentstore.server.logs.operations.StorageOperation r0 = r0.removeFirst()
            r0 = r6
            java.util.concurrent.atomic.AtomicInteger r0 = r0.mergeTransactionCount
            int r0 = r0.decrementAndGet()
            boolean r0 = io.pravega.segmentstore.server.writer.SegmentAggregator.$assertionsDisabled
            if (r0 != 0) goto La5
            r0 = r6
            java.util.concurrent.atomic.AtomicInteger r0 = r0.mergeTransactionCount
            int r0 = r0.get()
            if (r0 >= 0) goto La5
            java.lang.AssertionError r0 = new java.lang.AssertionError
            r1 = r0
            r1.<init>()
            throw r0
        La5:
            goto L17
        La8:
            r0 = r8
            java.util.concurrent.CompletableFuture r0 = io.pravega.common.concurrent.Futures.allOf(r0)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.pravega.segmentstore.server.writer.SegmentAggregator.deleteUnmergedSourceSegments(io.pravega.common.TimeoutTimer):java.util.concurrent.CompletableFuture");
    }

    private CompletableFuture<Void> createSegmentIfNecessary(Supplier<CompletableFuture<Void>> supplier, Duration duration) {
        if (this.handle.get() != null) {
            return supplier.get();
        }
        if ($assertionsDisabled || this.metadata.getStorageLength() == 0) {
            return Futures.exceptionallyComposeExpecting(this.storage.create(this.metadata.getName(), new SegmentRollingPolicy(getRolloverSize()), duration), th -> {
                return th instanceof StreamSegmentExistsException;
            }, () -> {
                log.info("{}: Segment did not exist in Storage when initialize() was called, but does now.", this.traceObjectId);
                return this.storage.openWrite(this.metadata.getName());
            }).thenComposeAsync(segmentHandle -> {
                this.handle.set(segmentHandle);
                return (CompletionStage) supplier.get();
            }, this.executor);
        }
        throw new AssertionError("no handle yet but metadata indicates Storage Segment not empty");
    }

    private long getRolloverSize() {
        return Math.min(this.metadata.getAttributes().getOrDefault(Attributes.ROLLOVER_SIZE, Long.valueOf(SegmentRollingPolicy.NO_ROLLING.getMaxLength())).longValue(), this.config.getMaxRolloverSize());
    }

    private CompletableFuture<Void> beginReconciliation(TimeoutTimer timeoutTimer) {
        if ($assertionsDisabled || this.state.get() == AggregatorState.ReconciliationNeeded) {
            return this.storage.getStreamSegmentInfo(this.metadata.getName(), timeoutTimer.getRemaining()).thenAcceptAsync(segmentProperties -> {
                if (segmentProperties.getLength() > this.metadata.getLength()) {
                    throw new CompletionException((Throwable) new ReconciliationFailureException("Actual Segment length in Storage is larger than the Metadata Length.", this.metadata, segmentProperties));
                }
                if (segmentProperties.getLength() < this.metadata.getStorageLength()) {
                    throw new CompletionException((Throwable) new ReconciliationFailureException("Actual Segment length in Storage is smaller than the Metadata StorageLength.", this.metadata, segmentProperties));
                }
                if (segmentProperties.getLength() == this.metadata.getStorageLength() && segmentProperties.isSealed() == this.metadata.isSealedInStorage()) {
                    setState(AggregatorState.Writing);
                } else {
                    this.reconciliationState.set(new ReconciliationState(this.metadata, segmentProperties));
                    setState(AggregatorState.Reconciling);
                }
            }, this.executor).exceptionally(th -> {
                Throwable unwrap = Exceptions.unwrap(th);
                if (!(unwrap instanceof StreamSegmentNotExistsException)) {
                    throw new CompletionException(unwrap);
                }
                if (this.metadata.isMerged() || this.metadata.isDeleted()) {
                    updateMetadataPostDeletion(this.metadata);
                    log.info("{}: Segment '{}' does not exist in Storage (reconciliation). Ignoring all further operations on it.", this.traceObjectId, this.metadata.getName());
                    this.reconciliationState.set(null);
                    setState(AggregatorState.Reconciling);
                    return null;
                }
                if (this.metadata.getStorageLength() > 0) {
                    throw new CompletionException((Throwable) new ReconciliationFailureException("Segment does not exist in Storage, but Metadata StorageLength is non-zero.", this.metadata, StreamSegmentInformation.builder().name(this.metadata.getName()).deleted(true).build()));
                }
                this.reconciliationState.set(new ReconciliationState(this.metadata, StreamSegmentInformation.builder().name(this.metadata.getName()).build()));
                setState(AggregatorState.Reconciling);
                return null;
            });
        }
        throw new AssertionError("beginReconciliation cannot be called if state == " + this.state);
    }

    private CompletableFuture<WriterFlushResult> reconcile(TimeoutTimer timeoutTimer) {
        ReconciliationState reconciliationState = this.reconciliationState.get();
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        if (reconciliationState == null) {
            setState(AggregatorState.Writing);
            return CompletableFuture.completedFuture(writerFlushResult);
        }
        if (this.hasDeletePending.get()) {
            setState(AggregatorState.Writing);
            return deleteSegment(timeoutTimer);
        }
        SegmentProperties storageInfo = reconciliationState.getStorageInfo();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "reconcile", new Object[]{reconciliationState});
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Supplier supplier = () -> {
            return Boolean.valueOf(this.operations.size() > 0 && !atomicBoolean.get());
        };
        Supplier supplier2 = () -> {
            StorageOperation first = this.operations.getFirst();
            return reconcileOperation(first, storageInfo, timeoutTimer).thenApply(writerFlushResult2 -> {
                if (first.getLastStreamSegmentOffset() >= storageInfo.getLength()) {
                    atomicBoolean.set(true);
                }
                log.info("{}: Reconciled {} ({}).", new Object[]{this.traceObjectId, first, writerFlushResult2});
                return writerFlushResult2;
            });
        };
        writerFlushResult.getClass();
        return Futures.loop(supplier, supplier2, writerFlushResult::withFlushResult, this.executor).thenApply(r15 -> {
            updateMetadata(storageInfo);
            this.reconciliationState.set(null);
            setState(AggregatorState.Writing);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "reconcile", traceEnterWithContext, new Object[]{writerFlushResult});
            return writerFlushResult;
        });
    }

    private CompletableFuture<WriterFlushResult> reconcileOperation(StorageOperation storageOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer) {
        CompletableFuture<WriterFlushResult> failedFuture;
        if (isAppendOperation(storageOperation)) {
            failedFuture = reconcileAppendOperation((AggregatedAppendOperation) storageOperation, segmentProperties, timeoutTimer);
        } else if (storageOperation instanceof MergeSegmentOperation) {
            failedFuture = reconcileMergeOperation((MergeSegmentOperation) storageOperation, segmentProperties, timeoutTimer);
        } else if (storageOperation instanceof StreamSegmentSealOperation) {
            failedFuture = reconcileSealOperation(segmentProperties, timeoutTimer.getRemaining());
        } else if (isTruncateOperation(storageOperation)) {
            updateStatePostTruncate();
            failedFuture = CompletableFuture.completedFuture(new WriterFlushResult());
        } else {
            failedFuture = Futures.failedFuture(new ReconciliationFailureException(String.format("Operation '%s' is not supported for reconciliation.", storageOperation), this.metadata, segmentProperties));
        }
        return failedFuture;
    }

    private CompletableFuture<WriterFlushResult> reconcileAppendOperation(AggregatedAppendOperation aggregatedAppendOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer) {
        WriterFlushResult writerFlushResult = new WriterFlushResult();
        return (aggregatedAppendOperation.getLength() > 0 ? reconcileData(aggregatedAppendOperation, segmentProperties, timeoutTimer).thenApply(num -> {
            writerFlushResult.withFlushedBytes(num.intValue());
            return Boolean.valueOf(((long) num.intValue()) >= aggregatedAppendOperation.getLength() && aggregatedAppendOperation.getLastStreamSegmentOffset() <= segmentProperties.getLength());
        }) : CompletableFuture.completedFuture(true)).thenApplyAsync(bool -> {
            if (bool.booleanValue()) {
                StorageOperation removeFirst = this.operations.removeFirst();
                if (!$assertionsDisabled && aggregatedAppendOperation != removeFirst) {
                    throw new AssertionError("Reconciled operation is not the same as removed operation");
                }
            }
            return writerFlushResult;
        });
    }

    private CompletableFuture<Integer> reconcileData(AggregatedAppendOperation aggregatedAppendOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer) {
        InputStream appendData = this.dataSource.getAppendData(aggregatedAppendOperation.getStreamSegmentId(), aggregatedAppendOperation.getStreamSegmentOffset(), (int) aggregatedAppendOperation.getLength());
        if (appendData == null) {
            return Futures.failedFuture(new ReconciliationFailureException(String.format("Unable to reconcile operation '%s' because no append data is associated with it.", aggregatedAppendOperation), this.metadata, segmentProperties));
        }
        long min = Math.min(aggregatedAppendOperation.getLastStreamSegmentOffset(), segmentProperties.getLength()) - aggregatedAppendOperation.getStreamSegmentOffset();
        if (!$assertionsDisabled && min <= 0) {
            throw new AssertionError("Append Operation to be reconciled is beyond the Segment's StorageLength (" + segmentProperties.getLength() + "): " + aggregatedAppendOperation);
        }
        byte[] bArr = new byte[(int) min];
        AtomicInteger atomicInteger = new AtomicInteger();
        return Futures.loop(() -> {
            return Boolean.valueOf(((long) atomicInteger.get()) < min);
        }, () -> {
            return this.storage.read(this.handle.get(), aggregatedAppendOperation.getStreamSegmentOffset() + atomicInteger.get(), bArr, atomicInteger.get(), ((int) min) - atomicInteger.get(), timeoutTimer.getRemaining());
        }, num -> {
            if (!$assertionsDisabled && num.intValue() <= 0) {
                throw new AssertionError(String.format("Unable to make any read progress when reconciling operation '%s' after reading %s bytes.", aggregatedAppendOperation, atomicInteger));
            }
            atomicInteger.addAndGet(num.intValue());
        }, this.executor).thenApplyAsync(r12 -> {
            verifySame(appendData, bArr, aggregatedAppendOperation, segmentProperties);
            return Integer.valueOf(atomicInteger.get());
        }, this.executor);
    }

    private void verifySame(InputStream inputStream, byte[] bArr, StorageOperation storageOperation, SegmentProperties segmentProperties) {
        for (int i = 0; i < bArr.length; i++) {
            if (((byte) inputStream.read()) != bArr[i]) {
                throw new ReconciliationFailureException(String.format("Unable to reconcile operation '%s' because of data differences at SegmentOffset %d.", storageOperation, Long.valueOf(storageOperation.getStreamSegmentOffset() + i)), this.metadata, segmentProperties);
            }
        }
    }

    private CompletableFuture<WriterFlushResult> reconcileMergeOperation(MergeSegmentOperation mergeSegmentOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer) {
        UpdateableSegmentMetadata streamSegmentMetadata = this.dataSource.getStreamSegmentMetadata(mergeSegmentOperation.getSourceSegmentId());
        return streamSegmentMetadata == null ? Futures.failedFuture(new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the source segment is missing from the metadata.", mergeSegmentOperation), this.metadata, segmentProperties)) : mergeSegmentOperation.getLastStreamSegmentOffset() > segmentProperties.getLength() ? Futures.failedFuture(new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the source segment is not fully merged into the target.", mergeSegmentOperation), this.metadata, segmentProperties)) : this.storage.exists(streamSegmentMetadata.getName(), timeoutTimer.getRemaining()).thenComposeAsync(bool -> {
            return bool.booleanValue() ? Futures.failedFuture(new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the transaction segment still exists in Storage.", mergeSegmentOperation), this.metadata, segmentProperties)) : this.dataSource.deleteAllAttributes(streamSegmentMetadata, timeoutTimer.getRemaining());
        }, this.executor).thenApplyAsync(r8 -> {
            StorageOperation removeFirst = this.operations.removeFirst();
            if (!$assertionsDisabled && (removeFirst == null || !(removeFirst instanceof MergeSegmentOperation))) {
                throw new AssertionError("First outstanding operation was not a MergeSegmentOperation");
            }
            int decrementAndGet = this.mergeTransactionCount.decrementAndGet();
            if (!$assertionsDisabled && decrementAndGet < 0) {
                throw new AssertionError("Negative value for mergeTransactionCount");
            }
            long lastStreamSegmentOffset = removeFirst.getLastStreamSegmentOffset();
            if (this.metadata.getStorageLength() < lastStreamSegmentOffset) {
                this.metadata.setStorageLength(lastStreamSegmentOffset);
            }
            updateMetadataForTransactionPostMerger(streamSegmentMetadata, removeFirst.getStreamSegmentId());
            return new WriterFlushResult().withMergedBytes(mergeSegmentOperation.getLength());
        }, this.executor);
    }

    private CompletableFuture<WriterFlushResult> reconcileSealOperation(SegmentProperties segmentProperties, Duration duration) {
        return (segmentProperties.isSealed() || segmentProperties.getLength() == 0) ? CompletableFuture.supplyAsync(() -> {
            updateStatePostSeal();
            return new WriterFlushResult();
        }, this.executor) : Futures.failedFuture(new ReconciliationFailureException("Segment was supposed to be sealed in storage but it is not.", this.metadata, segmentProperties));
    }

    private void checkValidOperation(StorageOperation storageOperation) throws DataCorruptionException {
        Preconditions.checkArgument(storageOperation.getStreamSegmentId() == this.metadata.getId(), "Operation '%s' refers to a different Segment than this one (%s).", storageOperation, this.metadata.getId());
        if (this.hasSealPending.get() && !isTruncateOperation(storageOperation) && !isDeleteOperation(storageOperation)) {
            throw new DataCorruptionException(String.format("Illegal operation for a sealed Segment; received '%s'.", storageOperation), new Object[0]);
        }
    }

    private void checkValidStorageOperation(StorageOperation storageOperation) throws DataCorruptionException {
        Preconditions.checkArgument(!(storageOperation instanceof StreamSegmentAppendOperation), "SegmentAggregator cannot process StreamSegmentAppendOperations.");
        long streamSegmentOffset = storageOperation.getStreamSegmentOffset();
        long length = storageOperation.getLength();
        Preconditions.checkArgument(streamSegmentOffset >= 0, "Operation '%s' has an invalid offset (%s).", storageOperation, storageOperation.getStreamSegmentOffset());
        Preconditions.checkArgument(length >= 0, "Operation '%s' has an invalid length (%s).", storageOperation, storageOperation.getLength());
        if (!isTruncateOperation(storageOperation)) {
            long j = this.lastAddedOffset.get();
            if (j >= 0 && streamSegmentOffset != j) {
                throw new DataCorruptionException(String.format("Wrong offset for Operation '%s'. Expected: %s, actual: %d.", storageOperation, this.lastAddedOffset, Long.valueOf(streamSegmentOffset)), new Object[0]);
            }
        } else if (this.metadata.getStartOffset() < storageOperation.getStreamSegmentOffset()) {
            throw new DataCorruptionException(String.format("StreamSegmentTruncateOperation '%s' has a truncation offset beyond the one in the Segment's Metadata. Expected: at most %d, actual: %d.", storageOperation, Long.valueOf(this.metadata.getStartOffset()), Long.valueOf(streamSegmentOffset)), new Object[0]);
        }
        if (streamSegmentOffset + length > this.metadata.getLength()) {
            throw new DataCorruptionException(String.format("Operation '%s' has at least one byte beyond its Length. Offset = %d, Length = %d, Length = %d.", storageOperation, Long.valueOf(streamSegmentOffset), Long.valueOf(length), Long.valueOf(this.metadata.getLength())), new Object[0]);
        }
        if (storageOperation instanceof StreamSegmentSealOperation) {
            if (this.metadata.getLength() != streamSegmentOffset) {
                throw new DataCorruptionException(String.format("Wrong offset for Operation '%s'. Expected: %d (Length), actual: %d.", storageOperation, Long.valueOf(this.metadata.getLength()), Long.valueOf(streamSegmentOffset)), new Object[0]);
            }
            if (!this.metadata.isSealed()) {
                throw new DataCorruptionException(String.format("Received Operation '%s' for a non-sealed segment.", storageOperation), new Object[0]);
            }
        }
    }

    private WriterFlushResult updateStatePostFlush(FlushArgs flushArgs) {
        long storageLength = this.metadata.getStorageLength();
        if (flushArgs.getLength() > 0) {
            storageLength += flushArgs.getLength();
            this.metadata.setStorageLength(storageLength);
        }
        boolean z = false;
        while (this.operations.size() > 0 && !z) {
            StorageOperation first = this.operations.getFirst();
            long lastStreamSegmentOffset = first.getLastStreamSegmentOffset();
            z = lastStreamSegmentOffset >= storageLength;
            if (!isAppendOperation(first)) {
                z = true;
            } else if (lastStreamSegmentOffset <= storageLength) {
                this.operations.removeFirst();
            }
        }
        this.lastFlush.set(this.timer.getElapsed());
        return new WriterFlushResult().withFlushedBytes(flushArgs.getLength());
    }

    private void updateStatePostSeal() {
        this.metadata.markSealedInStorage();
        this.operations.removeFirst();
        if (!$assertionsDisabled && this.operations.size() - this.truncateCount.get() != 0) {
            throw new AssertionError("there are outstanding non-truncate operations after a Seal");
        }
        this.hasSealPending.set(false);
    }

    private void updateStatePostTruncate() {
        this.operations.removeFirst();
        this.truncateCount.decrementAndGet();
    }

    private void updateMetadata(SegmentProperties segmentProperties) {
        this.metadata.setStorageLength(segmentProperties.getLength());
        if (!segmentProperties.isSealed() || this.metadata.isSealedInStorage()) {
            return;
        }
        this.metadata.markSealed();
        this.metadata.markSealedInStorage();
    }

    private void updateMetadataForTransactionPostMerger(UpdateableSegmentMetadata updateableSegmentMetadata, long j) {
        try {
            updateMetadataPostDeletion(updateableSegmentMetadata);
            this.dataSource.completeMerge(j, updateableSegmentMetadata.getId());
        } catch (StreamSegmentNotExistsException e) {
            throw e;
        }
    }

    private void updateMetadataPostDeletion(UpdateableSegmentMetadata updateableSegmentMetadata) {
        updateableSegmentMetadata.markDeleted();
        updateableSegmentMetadata.markDeletedInStorage();
    }

    private boolean isAppendOperation(StorageOperation storageOperation) {
        return storageOperation instanceof AggregatedAppendOperation;
    }

    private boolean isTruncateOperation(StorageOperation storageOperation) {
        return storageOperation instanceof StreamSegmentTruncateOperation;
    }

    private boolean isDeleteOperation(StorageOperation storageOperation) {
        return storageOperation instanceof DeleteSegmentOperation;
    }

    private void ensureInitializedAndNotClosed() {
        Exceptions.checkNotClosed(isClosed(), this);
        Preconditions.checkState(this.state.get() != AggregatorState.NotInitialized, "SegmentAggregator is not initialized. Cannot execute this operation.");
    }

    private void setState(AggregatorState aggregatorState) {
        AggregatorState aggregatorState2 = this.state.get();
        if (aggregatorState != aggregatorState2) {
            log.info("{}: State changed from {} to {}.", new Object[]{this.traceObjectId, aggregatorState2, aggregatorState});
        }
        this.state.set(aggregatorState);
    }

    private CompletableFuture<SegmentProperties> openWrite(String str, AtomicReference<SegmentHandle> atomicReference, Duration duration) {
        return this.storage.openWrite(str).thenComposeAsync(segmentHandle -> {
            atomicReference.set(segmentHandle);
            return this.storage.getStreamSegmentInfo(str, duration);
        }, this.executor);
    }

    static {
        $assertionsDisabled = !SegmentAggregator.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(SegmentAggregator.class);
    }
}
