package io.pravega.segmentstore.server.writer;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.AbstractTimer;
import io.pravega.common.ExceptionHelpers;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.logs.SerializationException;
import io.pravega.segmentstore.server.logs.operations.CachedStreamSegmentAppendOperation;
import io.pravega.segmentstore.server.logs.operations.MergeTransactionOperation;
import io.pravega.segmentstore.server.logs.operations.OperationType;
import io.pravega.segmentstore.server.logs.operations.StorageOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentAppendOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentSealOperation;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.Storage;
import java.beans.ConstructorProperties;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator.class */
public class SegmentAggregator implements OperationProcessor, AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final UpdateableSegmentMetadata metadata;
    private final WriterConfig config;
    private final OperationQueue operations;
    private final AbstractTimer timer;
    private final String traceObjectId;
    private final Storage storage;
    private final AtomicReference<SegmentHandle> handle;
    private final WriterDataSource dataSource;
    private final AtomicInteger mergeTransactionCount;
    private final AtomicBoolean hasSealPending;
    private final AtomicLong lastAddedOffset;
    private final AtomicReference<Duration> lastFlush;
    private final AtomicReference<AggregatorState> state;
    private final AtomicReference<ReconciliationState> reconciliationState;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$AggregatedAppendOperation.class */
    public static class AggregatedAppendOperation extends StorageOperation {
        private final AtomicLong streamSegmentOffset;
        private final AtomicInteger length;
        private final AtomicBoolean sealed;

        AggregatedAppendOperation(long j, long j2, long j3) {
            super(j);
            this.streamSegmentOffset = new AtomicLong(j2);
            setSequenceNumber(j3);
            this.length = new AtomicInteger();
            this.sealed = new AtomicBoolean();
        }

        void increaseLength(int i) {
            Preconditions.checkArgument(i > 0, "amount must be a positive integer.");
            this.length.addAndGet(i);
        }

        void seal() {
            this.sealed.set(true);
        }

        boolean isSealed() {
            return this.sealed.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation
        public long getStreamSegmentOffset() {
            return this.streamSegmentOffset.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation
        public long getLength() {
            return this.length.get();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.Operation
        protected OperationType getOperationType() {
            throw new UnsupportedOperationException();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.Operation
        protected void serializeContent(DataOutputStream dataOutputStream) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.Operation
        protected void deserializeContent(DataInputStream dataInputStream) throws IOException, SerializationException {
            throw new UnsupportedOperationException();
        }

        @Override // io.pravega.segmentstore.server.logs.operations.StorageOperation, io.pravega.segmentstore.server.logs.operations.Operation
        public String toString() {
            return String.format("AggregatedAppend: SegmentId = %s, Offsets = [%s-%s), SeqNo = %s", Long.valueOf(getStreamSegmentId()), Long.valueOf(getStreamSegmentOffset()), Long.valueOf(getStreamSegmentOffset() + getLength()), Long.valueOf(getSequenceNumber()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$FlushArgs.class */
    public static class FlushArgs {
        private final InputStream stream;
        private final int length;

        public String toString() {
            return String.format("TotalSize = %d", Integer.valueOf(this.length));
        }

        @SuppressFBWarnings(justification = "generated code")
        public InputStream getStream() {
            return this.stream;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int getLength() {
            return this.length;
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"stream", "length"})
        public FlushArgs(InputStream inputStream, int i) {
            this.stream = inputStream;
            this.length = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$OperationQueue.class */
    public static class OperationQueue {

        @GuardedBy("this")
        private final ArrayDeque<StorageOperation> queue;

        private OperationQueue() {
            this.queue = new ArrayDeque<>();
        }

        synchronized boolean add(StorageOperation storageOperation) {
            return this.queue.add(storageOperation);
        }

        synchronized StorageOperation getLast() {
            return this.queue.peekLast();
        }

        synchronized StorageOperation getFirst() {
            return this.queue.peekFirst();
        }

        synchronized StorageOperation removeFirst() {
            return this.queue.pollFirst();
        }

        synchronized int size() {
            return this.queue.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/SegmentAggregator$ReconciliationState.class */
    public static class ReconciliationState {
        private final SegmentProperties storageInfo;
        private final long initialStorageLength;

        ReconciliationState(SegmentMetadata segmentMetadata, SegmentProperties segmentProperties) {
            Preconditions.checkNotNull(segmentProperties, "storageInfo");
            this.storageInfo = segmentProperties;
            this.initialStorageLength = segmentMetadata.getStorageLength();
        }

        public String toString() {
            return String.format("Metadata.StorageLength = %d, Storage.Length = %d", Long.valueOf(this.initialStorageLength), Long.valueOf(this.storageInfo.getLength()));
        }

        @SuppressFBWarnings(justification = "generated code")
        public SegmentProperties getStorageInfo() {
            return this.storageInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getInitialStorageLength() {
            return this.initialStorageLength;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentAggregator(UpdateableSegmentMetadata updateableSegmentMetadata, WriterDataSource writerDataSource, Storage storage, WriterConfig writerConfig, AbstractTimer abstractTimer) {
        Preconditions.checkNotNull(updateableSegmentMetadata, "segmentMetadata");
        Preconditions.checkNotNull(writerDataSource, "dataSource");
        Preconditions.checkNotNull(storage, "storage");
        Preconditions.checkNotNull(writerConfig, "config");
        Preconditions.checkNotNull(abstractTimer, "timer");
        this.metadata = updateableSegmentMetadata;
        Preconditions.checkArgument(this.metadata.getContainerId() == writerDataSource.getId(), "SegmentMetadata.ContainerId is different from WriterDataSource.Id");
        this.traceObjectId = String.format("StorageWriter[%d-%d]", Integer.valueOf(this.metadata.getContainerId()), Long.valueOf(this.metadata.getId()));
        this.config = writerConfig;
        this.storage = storage;
        this.dataSource = writerDataSource;
        this.timer = abstractTimer;
        this.lastFlush = new AtomicReference<>(abstractTimer.getElapsed());
        this.lastAddedOffset = new AtomicLong(-1L);
        this.mergeTransactionCount = new AtomicInteger();
        this.hasSealPending = new AtomicBoolean();
        this.operations = new OperationQueue();
        this.state = new AtomicReference<>(AggregatorState.NotInitialized);
        this.reconciliationState = new AtomicReference<>();
        this.handle = new AtomicReference<>();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (isClosed()) {
            return;
        }
        setState(AggregatorState.Closed);
    }

    @Override // io.pravega.segmentstore.server.writer.OperationProcessor
    public long getLowestUncommittedSequenceNumber() {
        StorageOperation first = this.operations.getFirst();
        if (first == null) {
            return Long.MIN_VALUE;
        }
        return first.getSequenceNumber();
    }

    @Override // io.pravega.segmentstore.server.writer.OperationProcessor
    public boolean isClosed() {
        return this.state.get() == AggregatorState.Closed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentMetadata getMetadata() {
        return this.metadata;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Duration getElapsedSinceLastFlush() {
        return this.timer.getElapsed().minus(this.lastFlush.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean mustFlush() {
        if (this.metadata.isDeleted()) {
            return false;
        }
        return exceedsThresholds() || this.hasSealPending.get() || this.mergeTransactionCount.get() > 0 || (this.operations.size() > 0 && isReconciling());
    }

    private boolean exceedsThresholds() {
        long length = (this.operations.size() <= 0 || !isAppendOperation(this.operations.getFirst())) ? 0L : this.operations.getFirst().getLength();
        return length >= ((long) this.config.getFlushThresholdBytes()) || (length > 0 && getElapsedSinceLastFlush().compareTo(this.config.getFlushThresholdTime()) >= 0);
    }

    private boolean isReconciling() {
        AggregatorState aggregatorState = this.state.get();
        return aggregatorState == AggregatorState.ReconciliationNeeded || aggregatorState == AggregatorState.Reconciling;
    }

    public String toString() {
        return String.format("[%d: %s] Count = %d, LastOffset = %s, LUSN = %d LastFlush = %ds", Long.valueOf(this.metadata.getId()), this.metadata.getName(), Integer.valueOf(this.operations.size()), this.lastAddedOffset, Long.valueOf(getLowestUncommittedSequenceNumber()), Long.valueOf(getElapsedSinceLastFlush().toMillis() / 1000));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> initialize(Duration duration, Executor executor) {
        Exceptions.checkNotClosed(isClosed(), this);
        Preconditions.checkState(this.state.get() == AggregatorState.NotInitialized, "SegmentAggregator has already been initialized.");
        if (!$assertionsDisabled && this.handle.get() != null) {
            throw new AssertionError("non-null handle but state == " + this.state.get());
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "initialize", new Object[0]);
        return openWrite(this.metadata.getName(), this.handle, executor, duration).thenAccept(segmentProperties -> {
            if (this.metadata.getStorageLength() != segmentProperties.getLength()) {
                if (this.metadata.getStorageLength() >= 0) {
                    log.warn("{}: SegmentMetadata has a StorageLength ({}) that is different than the actual one ({}) - updating metadata.", new Object[]{this.traceObjectId, Long.valueOf(this.metadata.getStorageLength()), Long.valueOf(segmentProperties.getLength())});
                }
                this.metadata.setStorageLength(segmentProperties.getLength());
            }
            if (segmentProperties.isSealed()) {
                if (!this.metadata.isSealed()) {
                    throw new CompletionException((Throwable) new DataCorruptionException(String.format("Segment '%s' is sealed in Storage but not in the metadata.", this.metadata.getName())));
                }
                if (!this.metadata.isSealedInStorage()) {
                    this.metadata.markSealedInStorage();
                    log.warn("{}: Segment is sealed in Storage but metadata does not reflect that - updating metadata.", this.traceObjectId, Long.valueOf(segmentProperties.getLength()));
                }
            }
            log.info("{}: Initialized. StorageLength = {}, Sealed = {}.", new Object[]{this.traceObjectId, Long.valueOf(segmentProperties.getLength()), Boolean.valueOf(segmentProperties.isSealed())});
            LoggerHelpers.traceLeave(log, this.traceObjectId, "initialize", traceEnterWithContext, new Object[0]);
            setState(AggregatorState.Writing);
        }).exceptionally(th -> {
            Throwable realException = ExceptionHelpers.getRealException(th);
            if (!(realException instanceof StreamSegmentNotExistsException)) {
                throw new CompletionException(realException);
            }
            this.metadata.markDeleted();
            log.warn("{}: Segment does not exist in Storage. Ignoring all further operations on it.", this.traceObjectId, realException);
            setState(AggregatorState.Writing);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "initialize", traceEnterWithContext, new Object[0]);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(StorageOperation storageOperation) throws DataCorruptionException {
        ensureInitializedAndNotClosed();
        checkSegmentId(storageOperation);
        if (this.metadata.isDeleted()) {
            return;
        }
        checkValidOperation(storageOperation);
        long lastStreamSegmentOffset = storageOperation.getLastStreamSegmentOffset();
        if (lastStreamSegmentOffset > this.metadata.getStorageLength() || (!this.metadata.isSealedInStorage() && (storageOperation instanceof StreamSegmentSealOperation))) {
            processNewOperation(storageOperation);
        } else {
            acknowledgeAlreadyProcessedOperation(storageOperation);
        }
        this.lastAddedOffset.set(lastStreamSegmentOffset);
        log.debug("{}: Add {}; OpCount={}, MergeCount={}, Seal={}.", new Object[]{this.traceObjectId, storageOperation, Integer.valueOf(this.operations.size()), this.mergeTransactionCount, this.hasSealPending});
    }

    private void processNewOperation(StorageOperation storageOperation) {
        if (storageOperation instanceof MergeTransactionOperation) {
            this.operations.add(storageOperation);
            this.mergeTransactionCount.incrementAndGet();
        } else if (storageOperation instanceof StreamSegmentSealOperation) {
            this.operations.add(storageOperation);
            this.hasSealPending.set(true);
        } else if (storageOperation instanceof CachedStreamSegmentAppendOperation) {
            int maxFlushSizeBytes = this.config.getMaxFlushSizeBytes();
            aggregateAppendOperation(storageOperation, getOrCreateAggregatedAppend(storageOperation.getStreamSegmentOffset(), storageOperation.getSequenceNumber(), maxFlushSizeBytes), maxFlushSizeBytes);
        }
    }

    private void acknowledgeAlreadyProcessedOperation(StorageOperation storageOperation) {
        try {
            if (storageOperation instanceof MergeTransactionOperation) {
                MergeTransactionOperation mergeTransactionOperation = (MergeTransactionOperation) storageOperation;
                this.dataSource.completeMerge(mergeTransactionOperation.getStreamSegmentId(), mergeTransactionOperation.getTransactionSegmentId());
            }
        } catch (Exception e) {
            log.warn("Unable to acknowledge already processed operation '{}'.", storageOperation, e);
        }
    }

    private void aggregateAppendOperation(StorageOperation storageOperation, AggregatedAppendOperation aggregatedAppendOperation, int i) {
        long length = storageOperation.getLength();
        while (length > 0) {
            int min = (int) Math.min(i - aggregatedAppendOperation.getLength(), length);
            aggregatedAppendOperation.increaseLength(min);
            length -= min;
            if (length > 0) {
                if (!$assertionsDisabled && aggregatedAppendOperation.getLength() != i) {
                    throw new AssertionError("Unexpected AggregatedAppend.length when there is data to add");
                }
                aggregatedAppendOperation = new AggregatedAppendOperation(this.metadata.getId(), aggregatedAppendOperation.getLastStreamSegmentOffset(), storageOperation.getSequenceNumber());
                this.operations.add(aggregatedAppendOperation);
            }
        }
    }

    private AggregatedAppendOperation getOrCreateAggregatedAppend(long j, long j2, int i) {
        AggregatedAppendOperation aggregatedAppendOperation = null;
        if (this.operations.size() > 0) {
            StorageOperation last = this.operations.getLast();
            if (last.getLength() < i && isAppendOperation(last)) {
                aggregatedAppendOperation = (AggregatedAppendOperation) last;
                if (aggregatedAppendOperation.isSealed()) {
                    aggregatedAppendOperation = null;
                }
            }
        }
        if (aggregatedAppendOperation == null) {
            aggregatedAppendOperation = new AggregatedAppendOperation(this.metadata.getId(), j, j2);
            this.operations.add(aggregatedAppendOperation);
        }
        return aggregatedAppendOperation;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<FlushResult> flush(Duration duration, Executor executor) {
        CompletableFuture<FlushResult> failedFuture;
        ensureInitializedAndNotClosed();
        if (this.metadata.isDeleted()) {
            return CompletableFuture.completedFuture(new FlushResult());
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flush", new Object[0]);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        try {
            switch (this.state.get()) {
                case Writing:
                    failedFuture = flushNormally(timeoutTimer, executor);
                    break;
                case ReconciliationNeeded:
                    failedFuture = beginReconciliation(timeoutTimer).thenComposeAsync(r7 -> {
                        return reconcile(timeoutTimer, executor);
                    }, executor);
                    break;
                case Reconciling:
                    failedFuture = reconcile(timeoutTimer, executor);
                    break;
                default:
                    failedFuture = FutureHelpers.failedFuture(new IllegalStateException(String.format("Unexpected state for SegmentAggregator (%s) for segment '%s'.", this.state, this.metadata.getName())));
                    break;
            }
        } catch (Exception e) {
            failedFuture = FutureHelpers.failedFuture(e);
        }
        return failedFuture.thenApply(flushResult -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flush", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<FlushResult> flushNormally(TimeoutTimer timeoutTimer, Executor executor) {
        if (!$assertionsDisabled && this.state.get() != AggregatorState.Writing) {
            throw new AssertionError("flushNormally cannot be called if state == " + this.state);
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushNormally", new Object[]{Integer.valueOf(this.operations.size())});
        FlushResult flushResult = new FlushResult();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        atomicBoolean.getClass();
        return FutureHelpers.loop(atomicBoolean::get, () -> {
            return flushOnce(timeoutTimer, executor);
        }, flushResult2 -> {
            atomicBoolean.set(flushResult2.getFlushedBytes() + flushResult2.getMergedBytes() > 0);
            flushResult.withFlushResult(flushResult2);
        }, executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushNormally", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v6, types: [java.util.function.Function, java.util.concurrent.CompletableFuture<io.pravega.segmentstore.server.writer.FlushResult>] */
    /* JADX WARN: Type inference failed for: r1v8, types: [java.util.function.Function, java.util.concurrent.CompletableFuture<io.pravega.segmentstore.server.writer.FlushResult>] */
    private CompletableFuture<FlushResult> flushOnce(TimeoutTimer timeoutTimer, Executor executor) {
        CompletableFuture flushFully;
        boolean z = this.mergeTransactionCount.get() > 0;
        boolean z2 = this.hasSealPending.get();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushOnce", new Object[]{Integer.valueOf(this.operations.size()), this.mergeTransactionCount, Boolean.valueOf(z2)});
        if (z2 || z) {
            flushFully = flushFully(timeoutTimer, executor);
            if (z) {
                flushFully = flushFully.thenComposeAsync((Function) flushResult -> {
                    return mergeIfNecessary(flushResult, timeoutTimer, executor);
                }, executor);
            }
            if (z2) {
                flushFully = flushFully.thenComposeAsync(flushResult2 -> {
                    return sealIfNecessary(flushResult2, timeoutTimer);
                }, executor);
            }
        } else {
            flushFully = flushExcess(timeoutTimer, executor);
        }
        return flushFully.thenApply((Function) flushResult3 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushOnce", traceEnterWithContext, new Object[]{flushResult3});
            return flushResult3;
        });
    }

    private CompletableFuture<FlushResult> flushFully(TimeoutTimer timeoutTimer, Executor executor) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushFully", new Object[0]);
        FlushResult flushResult = new FlushResult();
        Supplier supplier = () -> {
            return Boolean.valueOf(isAppendOperation(this.operations.getFirst()));
        };
        Supplier supplier2 = () -> {
            return flushPendingAppends(timeoutTimer.getRemaining());
        };
        flushResult.getClass();
        return FutureHelpers.loop(supplier, supplier2, flushResult::withFlushResult, executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushFully", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<FlushResult> flushExcess(TimeoutTimer timeoutTimer, Executor executor) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushExcess", new Object[0]);
        FlushResult flushResult = new FlushResult();
        Supplier supplier = () -> {
            return Boolean.valueOf(!this.metadata.isDeleted() && exceedsThresholds());
        };
        Supplier supplier2 = () -> {
            return flushPendingAppends(timeoutTimer.getRemaining());
        };
        flushResult.getClass();
        return FutureHelpers.loop(supplier, supplier2, flushResult::withFlushResult, executor).thenApply(r14 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushExcess", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<FlushResult> flushPendingAppends(Duration duration) {
        try {
            FlushArgs flushArgs = getFlushArgs();
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flushPendingAppends", new Object[0]);
            if (flushArgs.getLength() != 0) {
                return this.storage.write(this.handle.get(), this.metadata.getStorageLength(), flushArgs.getStream(), flushArgs.getLength(), duration).thenApply(r14 -> {
                    FlushResult updateStatePostFlush = updateStatePostFlush(flushArgs);
                    LoggerHelpers.traceLeave(log, this.traceObjectId, "flushPendingAppends", traceEnterWithContext, new Object[]{updateStatePostFlush});
                    return updateStatePostFlush;
                }).exceptionally(th -> {
                    if (ExceptionHelpers.getRealException(th) instanceof BadOffsetException) {
                        setState(AggregatorState.ReconciliationNeeded);
                    }
                    throw new CompletionException(th);
                });
            }
            FlushResult flushResult = new FlushResult();
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flushPendingAppends", traceEnterWithContext, new Object[]{flushResult});
            return CompletableFuture.completedFuture(flushResult);
        } catch (DataCorruptionException e) {
            return FutureHelpers.failedFuture(e);
        }
    }

    private FlushArgs getFlushArgs() throws DataCorruptionException {
        StorageOperation first = this.operations.getFirst();
        if (!(first instanceof AggregatedAppendOperation)) {
            return new FlushArgs(null, 0);
        }
        AggregatedAppendOperation aggregatedAppendOperation = (AggregatedAppendOperation) first;
        int length = (int) aggregatedAppendOperation.getLength();
        InputStream appendData = this.dataSource.getAppendData(aggregatedAppendOperation.getStreamSegmentId(), aggregatedAppendOperation.getStreamSegmentOffset(), length);
        if (appendData != null) {
            aggregatedAppendOperation.seal();
            return new FlushArgs(appendData, length);
        }
        if (this.metadata.isDeleted()) {
            return new FlushArgs(null, 0);
        }
        throw new DataCorruptionException(String.format("Unable to retrieve CacheContents for '%s'.", aggregatedAppendOperation));
    }

    private CompletableFuture<FlushResult> mergeIfNecessary(FlushResult flushResult, TimeoutTimer timeoutTimer, Executor executor) {
        ensureInitializedAndNotClosed();
        if (!$assertionsDisabled && this.metadata.isTransaction()) {
            throw new AssertionError("Cannot merge into a Transaction StreamSegment.");
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "mergeIfNecessary", new Object[0]);
        StorageOperation first = this.operations.getFirst();
        if (first == null || !(first instanceof MergeTransactionOperation)) {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeIfNecessary", traceEnterWithContext, new Object[]{flushResult});
            return CompletableFuture.completedFuture(flushResult);
        }
        MergeTransactionOperation mergeTransactionOperation = (MergeTransactionOperation) first;
        return mergeWith(this.dataSource.getStreamSegmentMetadata(mergeTransactionOperation.getTransactionSegmentId()), mergeTransactionOperation, timeoutTimer, executor).thenApply(flushResult2 -> {
            flushResult.withFlushResult(flushResult2);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeIfNecessary", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<FlushResult> mergeWith(UpdateableSegmentMetadata updateableSegmentMetadata, MergeTransactionOperation mergeTransactionOperation, TimeoutTimer timeoutTimer, Executor executor) {
        if (updateableSegmentMetadata.isDeleted()) {
            return FutureHelpers.failedFuture(new DataCorruptionException(String.format("Attempted to merge with deleted Transaction segment '%s'.", updateableSegmentMetadata.getName())));
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "mergeWith", new Object[]{Long.valueOf(updateableSegmentMetadata.getId()), updateableSegmentMetadata.getName(), Boolean.valueOf(updateableSegmentMetadata.isSealedInStorage())});
        FlushResult flushResult = new FlushResult();
        if (!updateableSegmentMetadata.isSealedInStorage() || updateableSegmentMetadata.getDurableLogLength() > updateableSegmentMetadata.getStorageLength()) {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeWith", traceEnterWithContext, new Object[]{flushResult});
            return CompletableFuture.completedFuture(flushResult);
        }
        AtomicLong atomicLong = new AtomicLong();
        return this.storage.getStreamSegmentInfo(updateableSegmentMetadata.getName(), timeoutTimer.getRemaining()).thenAcceptAsync(segmentProperties -> {
            if (segmentProperties.getLength() != updateableSegmentMetadata.getStorageLength()) {
                throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' cannot be merged into parent '%s' because its metadata disagrees with the Storage. Metadata.StorageLength=%d, Storage.StorageLength=%d", updateableSegmentMetadata.getName(), this.metadata.getName(), Long.valueOf(updateableSegmentMetadata.getStorageLength()), Long.valueOf(segmentProperties.getLength()))));
            }
            if (segmentProperties.getLength() != mergeTransactionOperation.getLength()) {
                throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' cannot be merged into parent '%s' because the declared length in the operation disagrees with the Storage. Operation.Length=%d, Storage.StorageLength=%d", updateableSegmentMetadata.getName(), this.metadata.getName(), Long.valueOf(mergeTransactionOperation.getLength()), Long.valueOf(segmentProperties.getLength()))));
            }
            atomicLong.set(segmentProperties.getLength());
        }, executor).thenComposeAsync(r11 -> {
            return this.storage.concat(this.handle.get(), mergeTransactionOperation.getStreamSegmentOffset(), updateableSegmentMetadata.getName(), timeoutTimer.getRemaining());
        }, executor).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
            return this.storage.getStreamSegmentInfo(this.metadata.getName(), timeoutTimer.getRemaining());
        }, executor).thenApplyAsync(segmentProperties2 -> {
            StorageOperation removeFirst = this.operations.removeFirst();
            if (!$assertionsDisabled && (removeFirst == null || !(removeFirst instanceof MergeTransactionOperation))) {
                throw new AssertionError("First outstanding operation was not a MergeTransactionOperation");
            }
            if (!$assertionsDisabled && ((MergeTransactionOperation) removeFirst).getTransactionSegmentId() != updateableSegmentMetadata.getId()) {
                throw new AssertionError("First outstanding operation was a MergeTransactionOperation for the wrong Transaction id.");
            }
            int decrementAndGet = this.mergeTransactionCount.decrementAndGet();
            if (!$assertionsDisabled && decrementAndGet < 0) {
                throw new AssertionError("Negative value for mergeTransactionCount");
            }
            long storageLength = this.metadata.getStorageLength() + atomicLong.get();
            if (segmentProperties2.getLength() != storageLength) {
                throw new CompletionException((Throwable) new DataCorruptionException(String.format("Transaction Segment '%s' was merged into parent '%s' but the parent segment has an unexpected StorageLength after the merger. Previous=%d, MergeLength=%d, Expected=%d, Actual=%d", updateableSegmentMetadata.getName(), this.metadata.getName(), Long.valueOf(segmentProperties2.getLength()), Long.valueOf(atomicLong.get()), Long.valueOf(storageLength), Long.valueOf(segmentProperties2.getLength()))));
            }
            updateMetadata(segmentProperties2);
            updateMetadataForTransactionPostMerger(updateableSegmentMetadata);
            this.lastFlush.set(this.timer.getElapsed());
            flushResult.withMergedBytes(atomicLong.get());
            LoggerHelpers.traceLeave(log, this.traceObjectId, "mergeWith", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        }, executor).exceptionally(th -> {
            Throwable realException = ExceptionHelpers.getRealException(th);
            if ((realException instanceof BadOffsetException) || (realException instanceof StreamSegmentNotExistsException)) {
                setState(AggregatorState.ReconciliationNeeded);
            }
            throw new CompletionException(th);
        });
    }

    private CompletableFuture<FlushResult> sealIfNecessary(FlushResult flushResult, TimeoutTimer timeoutTimer) {
        if (!this.hasSealPending.get() || !(this.operations.getFirst() instanceof StreamSegmentSealOperation)) {
            return CompletableFuture.completedFuture(flushResult);
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "sealIfNecessary", new Object[0]);
        return this.storage.seal(this.handle.get(), timeoutTimer.getRemaining()).handle((r14, th) -> {
            if (th != null && !(ExceptionHelpers.getRealException(th) instanceof StreamSegmentSealedException)) {
                throw new CompletionException(th);
            }
            updateStatePostSeal();
            LoggerHelpers.traceLeave(log, this.traceObjectId, "sealIfNecessary", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<Void> beginReconciliation(TimeoutTimer timeoutTimer) {
        if ($assertionsDisabled || this.state.get() == AggregatorState.ReconciliationNeeded) {
            return this.storage.getStreamSegmentInfo(this.metadata.getName(), timeoutTimer.getRemaining()).thenAccept(segmentProperties -> {
                if (segmentProperties.getLength() > this.metadata.getDurableLogLength()) {
                    throw new CompletionException((Throwable) new ReconciliationFailureException("Actual Segment length in Storage is larger than the Metadata DurableLogLength.", this.metadata, segmentProperties));
                }
                if (segmentProperties.getLength() < this.metadata.getStorageLength()) {
                    throw new CompletionException((Throwable) new ReconciliationFailureException("Actual Segment length in Storage is smaller than the Metadata StorageLength.", this.metadata, segmentProperties));
                }
                if (segmentProperties.getLength() == this.metadata.getStorageLength()) {
                    return;
                }
                this.reconciliationState.set(new ReconciliationState(this.metadata, segmentProperties));
                setState(AggregatorState.Reconciling);
            });
        }
        throw new AssertionError("beginReconciliation cannot be called if state == " + this.state);
    }

    private CompletableFuture<FlushResult> reconcile(TimeoutTimer timeoutTimer, Executor executor) {
        if (!$assertionsDisabled && this.state.get() != AggregatorState.Reconciling) {
            throw new AssertionError("reconcile cannot be called if state == " + this.state);
        }
        ReconciliationState reconciliationState = this.reconciliationState.get();
        if (!$assertionsDisabled && reconciliationState == null) {
            throw new AssertionError("reconciliationState is null");
        }
        SegmentProperties storageInfo = reconciliationState.getStorageInfo();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "reconcile", new Object[]{reconciliationState});
        FlushResult flushResult = new FlushResult();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Supplier supplier = () -> {
            return Boolean.valueOf(this.operations.size() > 0 && !atomicBoolean.get());
        };
        Supplier supplier2 = () -> {
            StorageOperation first = this.operations.getFirst();
            return reconcileOperation(first, storageInfo, timeoutTimer, executor).thenApply(flushResult2 -> {
                if (first.getLastStreamSegmentOffset() >= storageInfo.getLength()) {
                    atomicBoolean.set(true);
                }
                log.info("{}: Reconciled {} ({}).", new Object[]{this.traceObjectId, first, flushResult2});
                return flushResult2;
            });
        };
        flushResult.getClass();
        return FutureHelpers.loop(supplier, supplier2, flushResult::withFlushResult, executor).thenApply(r15 -> {
            updateMetadata(storageInfo);
            this.reconciliationState.set(null);
            setState(AggregatorState.Writing);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "reconcile", traceEnterWithContext, new Object[]{flushResult});
            return flushResult;
        });
    }

    private CompletableFuture<FlushResult> reconcileOperation(StorageOperation storageOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer, Executor executor) {
        return isAppendOperation(storageOperation) ? reconcileAppendOperation(storageOperation, segmentProperties, timeoutTimer, executor) : storageOperation instanceof MergeTransactionOperation ? reconcileMergeOperation((MergeTransactionOperation) storageOperation, segmentProperties, timeoutTimer) : storageOperation instanceof StreamSegmentSealOperation ? reconcileSealOperation(segmentProperties) : FutureHelpers.failedFuture(new ReconciliationFailureException(String.format("Operation '%s' is not supported for reconciliation.", storageOperation), this.metadata, segmentProperties));
    }

    private CompletableFuture<FlushResult> reconcileAppendOperation(StorageOperation storageOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer, Executor executor) {
        Preconditions.checkArgument(storageOperation instanceof AggregatedAppendOperation, "Not given an append operation.");
        InputStream appendData = this.dataSource.getAppendData(storageOperation.getStreamSegmentId(), storageOperation.getStreamSegmentOffset(), (int) storageOperation.getLength());
        if (appendData == null) {
            return FutureHelpers.failedFuture(new ReconciliationFailureException(String.format("Unable to reconcile operation '%s' because no append data is associated with it.", storageOperation), this.metadata, segmentProperties));
        }
        long min = Math.min(storageOperation.getLastStreamSegmentOffset(), segmentProperties.getLength()) - storageOperation.getStreamSegmentOffset();
        if (!$assertionsDisabled && min <= 0) {
            throw new AssertionError("Append Operation to be reconciled is beyond the Segment's StorageLength " + storageOperation);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        byte[] bArr = new byte[(int) min];
        return FutureHelpers.loop(() -> {
            return Boolean.valueOf(((long) atomicInteger.get()) < min);
        }, () -> {
            return this.storage.read(this.handle.get(), storageOperation.getStreamSegmentOffset() + atomicInteger.get(), bArr, atomicInteger.get(), ((int) min) - atomicInteger.get(), timeoutTimer.getRemaining());
        }, num -> {
            if (!$assertionsDisabled && num.intValue() <= 0) {
                throw new AssertionError(String.format("Unable to make any read progress when reconciling operation '%s' after reading %s bytes.", storageOperation, atomicInteger));
            }
            atomicInteger.addAndGet(num.intValue());
        }, executor).thenApply(r13 -> {
            verifySame(appendData, bArr, storageOperation, segmentProperties);
            if (min >= storageOperation.getLength() && storageOperation.getLastStreamSegmentOffset() <= segmentProperties.getLength()) {
                StorageOperation removeFirst = this.operations.removeFirst();
                if (!$assertionsDisabled && storageOperation != removeFirst) {
                    throw new AssertionError("Reconciled operation is not the same as removed operation");
                }
            }
            return new FlushResult().withFlushedBytes(min);
        });
    }

    private void verifySame(InputStream inputStream, byte[] bArr, StorageOperation storageOperation, SegmentProperties segmentProperties) {
        for (int i = 0; i < bArr.length; i++) {
            if (((byte) inputStream.read()) != bArr[i]) {
                throw new ReconciliationFailureException(String.format("Unable to reconcile operation '%s' because of data differences at SegmentOffset %d.", storageOperation, Long.valueOf(storageOperation.getStreamSegmentOffset() + i)), this.metadata, segmentProperties);
            }
        }
    }

    private CompletableFuture<FlushResult> reconcileMergeOperation(MergeTransactionOperation mergeTransactionOperation, SegmentProperties segmentProperties, TimeoutTimer timeoutTimer) {
        UpdateableSegmentMetadata streamSegmentMetadata = this.dataSource.getStreamSegmentMetadata(mergeTransactionOperation.getTransactionSegmentId());
        return (streamSegmentMetadata == null || streamSegmentMetadata.isDeleted()) ? FutureHelpers.failedFuture(new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the transaction segment is deleted or missing from the metadata.", mergeTransactionOperation), this.metadata, segmentProperties)) : mergeTransactionOperation.getLastStreamSegmentOffset() > segmentProperties.getLength() ? FutureHelpers.failedFuture(new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the transaction segment is not fully merged into the parent.", mergeTransactionOperation), this.metadata, segmentProperties)) : this.storage.exists(streamSegmentMetadata.getName(), timeoutTimer.getRemaining()).thenApply(bool -> {
            if (bool.booleanValue()) {
                throw new CompletionException((Throwable) new ReconciliationFailureException(String.format("Cannot reconcile operation '%s' because the transaction segment still exists in Storage.", mergeTransactionOperation), this.metadata, segmentProperties));
            }
            StorageOperation removeFirst = this.operations.removeFirst();
            if (!$assertionsDisabled && (removeFirst == null || !(removeFirst instanceof MergeTransactionOperation))) {
                throw new AssertionError("First outstanding operation was not a MergeTransactionOperation");
            }
            int decrementAndGet = this.mergeTransactionCount.decrementAndGet();
            if (!$assertionsDisabled && decrementAndGet < 0) {
                throw new AssertionError("Negative value for mergeTransactionCount");
            }
            updateMetadataForTransactionPostMerger(streamSegmentMetadata);
            return new FlushResult().withMergedBytes(mergeTransactionOperation.getLength());
        });
    }

    private CompletableFuture<FlushResult> reconcileSealOperation(SegmentProperties segmentProperties) {
        if (!segmentProperties.isSealed()) {
            return FutureHelpers.failedFuture(new ReconciliationFailureException("Segment was supposed to be sealed in storage but it is not.", this.metadata, segmentProperties));
        }
        updateStatePostSeal();
        return CompletableFuture.completedFuture(new FlushResult());
    }

    private void checkSegmentId(StorageOperation storageOperation) {
        if (!(storageOperation instanceof MergeTransactionOperation)) {
            Preconditions.checkArgument(storageOperation.getStreamSegmentId() == this.metadata.getId(), "Operation '%s' refers to a different StreamSegment than this one (%s).", storageOperation, this.metadata.getId());
        } else {
            Preconditions.checkArgument(!this.metadata.isTransaction(), "MergeTransactionOperations can only be added to the parent StreamSegment; received '%s'.", storageOperation);
            Preconditions.checkArgument(storageOperation.getStreamSegmentId() == this.metadata.getId(), "Operation '%s' refers to a different StreamSegment as a target (parent) than this one (%s).", storageOperation, this.metadata.getId());
        }
    }

    private void checkValidOperation(StorageOperation storageOperation) throws DataCorruptionException {
        if (this.hasSealPending.get()) {
            throw new DataCorruptionException(String.format("No operation is allowed for a sealed segment; received '%s' .", storageOperation));
        }
        Preconditions.checkArgument(!(storageOperation instanceof StreamSegmentAppendOperation), "SegmentAggregator cannot process StreamSegmentAppendOperations.");
        long streamSegmentOffset = storageOperation.getStreamSegmentOffset();
        long length = storageOperation.getLength();
        Preconditions.checkArgument(streamSegmentOffset >= 0, "Operation '%s' has an invalid offset (%s).", storageOperation, storageOperation.getStreamSegmentOffset());
        Preconditions.checkArgument(length >= 0, "Operation '%s' has an invalid length (%s).", storageOperation, storageOperation.getLength());
        long j = this.lastAddedOffset.get();
        if (j >= 0 && streamSegmentOffset != j) {
            throw new DataCorruptionException(String.format("Wrong offset for Operation '%s'. Expected: %s, actual: %d.", storageOperation, this.lastAddedOffset, Long.valueOf(streamSegmentOffset)));
        }
        if (streamSegmentOffset + length > this.metadata.getDurableLogLength()) {
            throw new DataCorruptionException(String.format("Operation '%s' has at least one byte beyond its DurableLogLength. Offset = %d, Length = %d, DurableLogLength = %d.", storageOperation, Long.valueOf(streamSegmentOffset), Long.valueOf(length), Long.valueOf(this.metadata.getDurableLogLength())));
        }
        if (storageOperation instanceof StreamSegmentSealOperation) {
            if (this.metadata.getDurableLogLength() != streamSegmentOffset) {
                throw new DataCorruptionException(String.format("Wrong offset for Operation '%s'. Expected: %d (DurableLogLength), actual: %d.", storageOperation, Long.valueOf(this.metadata.getDurableLogLength()), Long.valueOf(streamSegmentOffset)));
            }
            if (!this.metadata.isSealed()) {
                throw new DataCorruptionException(String.format("Received Operation '%s' for a non-sealed segment.", storageOperation));
            }
        }
    }

    private FlushResult updateStatePostFlush(FlushArgs flushArgs) {
        long storageLength = this.metadata.getStorageLength() + flushArgs.getLength();
        this.metadata.setStorageLength(storageLength);
        boolean z = false;
        while (this.operations.size() > 0 && !z) {
            StorageOperation first = this.operations.getFirst();
            long lastStreamSegmentOffset = first.getLastStreamSegmentOffset();
            z = lastStreamSegmentOffset >= storageLength;
            if (!$assertionsDisabled && !z && !isAppendOperation(first)) {
                throw new AssertionError("Flushed operation was not an Append.");
            }
            if (lastStreamSegmentOffset <= storageLength) {
                this.operations.removeFirst();
            }
        }
        this.lastFlush.set(this.timer.getElapsed());
        return new FlushResult().withFlushedBytes(flushArgs.getLength());
    }

    private void updateStatePostSeal() {
        this.metadata.markSealedInStorage();
        this.operations.removeFirst();
        if (!$assertionsDisabled && this.operations.size() != 0) {
            throw new AssertionError("Processed StreamSegmentSeal operation but more operations are outstanding.");
        }
        this.hasSealPending.set(false);
        close();
    }

    private void updateMetadata(SegmentProperties segmentProperties) {
        this.metadata.setStorageLength(segmentProperties.getLength());
        if (!segmentProperties.isSealed() || this.metadata.isSealedInStorage()) {
            return;
        }
        this.metadata.markSealed();
        this.metadata.markSealedInStorage();
    }

    private void updateMetadataForTransactionPostMerger(UpdateableSegmentMetadata updateableSegmentMetadata) {
        updateableSegmentMetadata.markDeleted();
        this.dataSource.deleteStreamSegment(updateableSegmentMetadata.getName());
        this.dataSource.completeMerge(updateableSegmentMetadata.getParentId(), updateableSegmentMetadata.getId());
    }

    private boolean isAppendOperation(StorageOperation storageOperation) {
        return storageOperation != null && (storageOperation instanceof AggregatedAppendOperation);
    }

    private void ensureInitializedAndNotClosed() {
        Exceptions.checkNotClosed(isClosed(), this);
        Preconditions.checkState(this.state.get() != AggregatorState.NotInitialized, "SegmentAggregator is not initialized. Cannot execute this operation.");
    }

    private void setState(AggregatorState aggregatorState) {
        AggregatorState aggregatorState2 = this.state.get();
        if (aggregatorState != aggregatorState2) {
            log.info("{}: State changed from {} to {}.", new Object[]{this.traceObjectId, aggregatorState2, aggregatorState});
        }
        this.state.set(aggregatorState);
    }

    private CompletableFuture<SegmentProperties> openWrite(String str, AtomicReference<SegmentHandle> atomicReference, Executor executor, Duration duration) {
        return this.storage.openWrite(str).thenComposeAsync(segmentHandle -> {
            atomicReference.set(segmentHandle);
            return this.storage.getStreamSegmentInfo(str, duration);
        }, executor);
    }

    static {
        $assertionsDisabled = !SegmentAggregator.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(SegmentAggregator.class);
    }
}
