package io.pravega.segmentstore.server.writer;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.MathHelpers;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.Futures;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.SegmentOperation;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.Writer;
import io.pravega.segmentstore.server.WriterFactory;
import io.pravega.segmentstore.server.WriterFlushResult;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.logs.operations.MetadataCheckpointOperation;
import io.pravega.segmentstore.server.logs.operations.MetadataOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.StorageOperation;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.segmentstore.storage.StorageNotPrimaryException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/writer/StorageWriter.class */
class StorageWriter extends AbstractThreadPoolService implements Writer {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(StorageWriter.class);
    private final WriterConfig config;
    private final WriterDataSource dataSource;
    private final Storage storage;
    private final HashMap<Long, ProcessorCollection> processors;
    private final WriterState state;
    private final Timer timer;
    private final AckCalculator ackCalculator;
    private final WriterFactory.CreateProcessors createProcessors;

    /* loaded from: input_file:io/pravega/segmentstore/server/writer/StorageWriter$FlushStageResult.class */
    private static class FlushStageResult extends WriterFlushResult {
        int count;

        private FlushStageResult() {
        }

        @Override // io.pravega.segmentstore.server.WriterFlushResult
        public FlushStageResult withFlushResult(WriterFlushResult writerFlushResult) {
            this.count++;
            return (FlushStageResult) super.withFlushResult(writerFlushResult);
        }

        @Override // io.pravega.segmentstore.server.WriterFlushResult
        public String toString() {
            return String.format("Count=%d, %s", Integer.valueOf(this.count), super.toString());
        }
    }

    /* loaded from: input_file:io/pravega/segmentstore/server/writer/StorageWriter$InputReadStageResult.class */
    private static class InputReadStageResult {
        int count;
        long bytes;
        private final WriterState state;

        InputReadStageResult(WriterState writerState) {
            this.state = writerState;
        }

        void operationProcessed(Operation operation) {
            this.count++;
            if (operation instanceof StorageOperation) {
                this.bytes += ((StorageOperation) operation).getLength();
            }
        }

        public String toString() {
            return String.format("Count=%d, Bytes=%d, LastReadSN=%d", Integer.valueOf(this.count), Long.valueOf(this.bytes), Long.valueOf(this.state.getLastReadSequenceNumber()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/StorageWriter$ProcessorCollection.class */
    public class ProcessorCollection implements WriterSegmentProcessor {
        private final SegmentAggregator aggregator;
        private final List<WriterSegmentProcessor> processors;

        ProcessorCollection(SegmentAggregator segmentAggregator, Collection<WriterSegmentProcessor> collection) {
            this.aggregator = segmentAggregator;
            this.processors = ImmutableList.builder().add(segmentAggregator).addAll(collection).build();
        }

        Duration getElapsedSinceLastFlush() {
            return this.aggregator.getElapsedSinceLastFlush();
        }

        long getId() {
            return this.aggregator.getMetadata().getId();
        }

        boolean shouldClose() {
            return this.aggregator.getMetadata().isDeletedInStorage() || !this.aggregator.getMetadata().isActive();
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor, java.lang.AutoCloseable
        public void close() {
            this.processors.forEach((v0) -> {
                v0.close();
            });
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
        public boolean isClosed() {
            return this.processors.stream().allMatch((v0) -> {
                return v0.isClosed();
            });
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
        public long getLowestUncommittedSequenceNumber() {
            return this.processors.size() == 1 ? this.processors.get(0).getLowestUncommittedSequenceNumber() : StorageWriter.this.ackCalculator.getLowestUncommittedSequenceNumber(this.processors);
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
        public boolean mustFlush() {
            return this.processors.stream().anyMatch((v0) -> {
                return v0.mustFlush();
            });
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
        public void add(SegmentOperation segmentOperation) throws DataCorruptionException {
            Iterator<WriterSegmentProcessor> it = this.processors.iterator();
            while (it.hasNext()) {
                it.next().add(segmentOperation);
            }
        }

        @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
        public CompletableFuture<WriterFlushResult> flush(Duration duration) {
            return Futures.allOfWithResults((List) this.processors.stream().map(writerSegmentProcessor -> {
                return writerSegmentProcessor.flush(duration);
            }).collect(Collectors.toList())).thenApply(list -> {
                WriterFlushResult writerFlushResult = (WriterFlushResult) list.get(0);
                for (int i = 1; i < list.size(); i++) {
                    writerFlushResult.withFlushResult((WriterFlushResult) list.get(i));
                }
                return writerFlushResult;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StorageWriter(WriterConfig writerConfig, WriterDataSource writerDataSource, Storage storage, WriterFactory.CreateProcessors createProcessors, ScheduledExecutorService scheduledExecutorService) {
        super(String.format("StorageWriter[%d]", Integer.valueOf(writerDataSource.getId())), scheduledExecutorService);
        this.config = (WriterConfig) Preconditions.checkNotNull(writerConfig, "config");
        this.dataSource = writerDataSource;
        this.storage = (Storage) Preconditions.checkNotNull(storage, "storage");
        this.createProcessors = (WriterFactory.CreateProcessors) Preconditions.checkNotNull(createProcessors, "createProcessors");
        this.processors = new HashMap<>();
        this.state = new WriterState();
        this.timer = new Timer();
        this.ackCalculator = new AckCalculator(this.state);
    }

    protected Duration getShutdownTimeout() {
        return this.config.getShutdownTimeout();
    }

    protected CompletableFuture<Void> doRun() {
        return Futures.loop(this::canRun, () -> {
            return Futures.delayedFuture(getIterationStartDelay(), this.executor).thenRun(this::beginIteration).thenComposeAsync(this::readData, (Executor) this.executor).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) this::processReadResult, (Executor) this.executor).thenComposeAsync(this::flush, (Executor) this.executor).thenComposeAsync(this::acknowledge, (Executor) this.executor).exceptionally(this::iterationErrorHandler).thenRunAsync(this::endIteration, (Executor) this.executor);
        }, this.executor).thenRun(this::closeProcessors);
    }

    private boolean canRun() {
        return isRunning() && getStopException() == null;
    }

    private void beginIteration() {
        this.state.recordIterationStarted(this.timer);
        logStageEvent("Start", null);
    }

    private void endIteration() {
        cleanup();
        logStageEvent("Finish", "Elapsed " + this.state.getElapsedSinceIterationStart(this.timer).toMillis() + "ms");
    }

    private Void iterationErrorHandler(Throwable th) {
        if (isShutdownException(th) && !canRun()) {
            log.info("{}: StorageWriter intercepted {} while shutting down.", this.traceObjectId, Exceptions.unwrap(th).getClass().getSimpleName());
            return null;
        }
        boolean isCriticalError = isCriticalError(th);
        logError(th, isCriticalError);
        if (!isCriticalError) {
            this.state.recordIterationError();
            return null;
        }
        super.errorHandler(th);
        stopAsync();
        return null;
    }

    private void closeProcessors() {
        this.processors.values().forEach((v0) -> {
            v0.close();
        });
        this.processors.clear();
    }

    private CompletableFuture<Iterator<Operation>> readData(Void r7) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "readData", new Object[0]);
        try {
            Duration readTimeout = getReadTimeout();
            return this.dataSource.read(this.state.getLastReadSequenceNumber(), this.config.getMaxItemsToReadAtOnce(), readTimeout).thenApply(it -> {
                LoggerHelpers.traceLeave(log, this.traceObjectId, "readData", traceEnterWithContext, new Object[0]);
                return it;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                Throwable unwrap = Exceptions.unwrap(th);
                if (!(unwrap instanceof TimeoutException)) {
                    throw new CompletionException(unwrap);
                }
                log.debug("{}: Iteration[{}] No items were read during allotted timeout of {}ms", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), Long.valueOf(readTimeout.toMillis())});
                return null;
            });
        } catch (Throwable th2) {
            Throwable unwrap = Exceptions.unwrap(th2);
            if (!(unwrap instanceof TimeoutException)) {
                return Futures.failedFuture(th2);
            }
            logErrorHandled(unwrap);
            return CompletableFuture.completedFuture(null);
        }
    }

    private CompletableFuture<Void> processReadResult(Iterator<Operation> it) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "processReadResult", new Object[0]);
        InputReadStageResult inputReadStageResult = new InputReadStageResult(this.state);
        if (it != null) {
            return Futures.loop(() -> {
                return Boolean.valueOf(canRun() && it.hasNext());
            }, () -> {
                Operation operation = (Operation) it.next();
                return processOperation(operation).thenRun(() -> {
                    this.state.setLastReadSequenceNumber(operation.getSequenceNumber());
                    inputReadStageResult.operationProcessed(operation);
                });
            }, this.executor).thenRun(() -> {
                logStageEvent("InputRead", inputReadStageResult);
                LoggerHelpers.traceLeave(log, this.traceObjectId, "processReadResult", traceEnterWithContext, new Object[0]);
            });
        }
        logStageEvent("InputRead", inputReadStageResult);
        LoggerHelpers.traceLeave(log, this.traceObjectId, "processReadResult", traceEnterWithContext, new Object[0]);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> processOperation(Operation operation) {
        return operation.getSequenceNumber() <= this.state.getLastReadSequenceNumber() ? Futures.failedFuture(new DataCorruptionException(String.format("Operation '%s' has a sequence number that is lower than the previous one (%d).", operation, Long.valueOf(this.state.getLastReadSequenceNumber())), new Object[0])) : operation instanceof SegmentOperation ? processSegmentOperation((SegmentOperation) operation) : operation instanceof MetadataOperation ? processMetadataOperation((MetadataOperation) operation) : Futures.failedFuture(new DataCorruptionException(String.format("Unsupported operation %s.", operation), new Object[0]));
    }

    private CompletableFuture<Void> processMetadataOperation(MetadataOperation metadataOperation) {
        return (!(metadataOperation instanceof MetadataCheckpointOperation) || this.dataSource.isValidTruncationPoint(metadataOperation.getSequenceNumber())) ? CompletableFuture.completedFuture(null) : Futures.failedFuture(new DataCorruptionException(String.format("Operation '%s' does not correspond to a valid Truncation Point in the metadata.", metadataOperation), new Object[0]));
    }

    private CompletableFuture<Void> processSegmentOperation(SegmentOperation segmentOperation) {
        return getProcessor(segmentOperation.getStreamSegmentId()).thenAccept(processorCollection -> {
            try {
                processorCollection.add(segmentOperation);
            } catch (DataCorruptionException e) {
                throw new CompletionException((Throwable) e);
            }
        });
    }

    private CompletableFuture<Void> flush(Void r6) {
        checkRunning();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "flush", new Object[0]);
        return Futures.allOfWithResults((List) this.processors.values().stream().filter((v0) -> {
            return v0.mustFlush();
        }).map(processorCollection -> {
            return processorCollection.flush(this.config.getFlushTimeout());
        }).collect(Collectors.toList())).thenAcceptAsync(list -> {
            FlushStageResult flushStageResult = new FlushStageResult();
            flushStageResult.getClass();
            list.forEach(flushStageResult::withFlushResult);
            if (flushStageResult.getFlushedBytes() + flushStageResult.getMergedBytes() + flushStageResult.count > 0) {
                logStageEvent("Flush", flushStageResult);
            }
            LoggerHelpers.traceLeave(log, this.traceObjectId, "flush", traceEnterWithContext, new Object[0]);
        }, (Executor) this.executor);
    }

    private void cleanup() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "cleanup", new Object[0]);
        List list = (List) this.processors.values().stream().map(this::closeIfNecessary).filter((v0) -> {
            return v0.isClosed();
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        HashMap<Long, ProcessorCollection> hashMap = this.processors;
        hashMap.getClass();
        list.forEach((v1) -> {
            r1.remove(v1);
        });
        LoggerHelpers.traceLeave(log, this.traceObjectId, "cleanup", traceEnterWithContext, new Object[]{Integer.valueOf(list.size())});
    }

    private ProcessorCollection closeIfNecessary(ProcessorCollection processorCollection) {
        if (processorCollection.shouldClose()) {
            processorCollection.close();
        }
        return processorCollection;
    }

    private CompletableFuture<Void> acknowledge(Void r12) {
        checkRunning();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "acknowledge", new Object[0]);
        long closestValidTruncationPoint = this.dataSource.getClosestValidTruncationPoint(this.ackCalculator.getHighestCommittedSequenceNumber(this.processors.values()));
        if (closestValidTruncationPoint > this.state.getLastTruncatedSequenceNumber()) {
            return this.dataSource.acknowledge(closestValidTruncationPoint, this.config.getAckTimeout()).thenRun(() -> {
                this.state.setLastTruncatedSequenceNumber(closestValidTruncationPoint);
                logStageEvent("Acknowledged", "SeqNo=" + closestValidTruncationPoint);
                LoggerHelpers.traceLeave(log, this.traceObjectId, "acknowledge", traceEnterWithContext, new Object[]{Long.valueOf(closestValidTruncationPoint)});
            });
        }
        LoggerHelpers.traceLeave(log, this.traceObjectId, "acknowledge", traceEnterWithContext, new Object[]{Long.MIN_VALUE});
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<ProcessorCollection> getProcessor(long j) {
        ProcessorCollection orDefault = this.processors.getOrDefault(Long.valueOf(j), null);
        if (orDefault != null) {
            if (!closeIfNecessary(orDefault).isClosed()) {
                return CompletableFuture.completedFuture(orDefault);
            }
            this.processors.remove(Long.valueOf(j));
        }
        UpdateableSegmentMetadata streamSegmentMetadata = this.dataSource.getStreamSegmentMetadata(j);
        if (streamSegmentMetadata == null) {
            return Futures.failedFuture(new DataCorruptionException(String.format("No StreamSegment with id '%d' is registered in the metadata.", Long.valueOf(j)), new Object[0]));
        }
        SegmentAggregator segmentAggregator = new SegmentAggregator(streamSegmentMetadata, this.dataSource, this.storage, this.config, this.timer, this.executor);
        ProcessorCollection processorCollection = new ProcessorCollection(segmentAggregator, this.createProcessors.apply(streamSegmentMetadata));
        try {
            CompletableFuture<Void> initialize = segmentAggregator.initialize(this.config.getFlushTimeout());
            Futures.exceptionListener(initialize, th -> {
                segmentAggregator.close();
            });
            return initialize.thenApply(r8 -> {
                this.processors.put(Long.valueOf(j), processorCollection);
                return processorCollection;
            });
        } catch (Exception e) {
            processorCollection.close();
            throw e;
        }
    }

    private boolean isCriticalError(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        return Exceptions.mustRethrow(unwrap) || (unwrap instanceof DataCorruptionException) || (unwrap instanceof StorageNotPrimaryException);
    }

    private boolean isShutdownException(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        return (unwrap instanceof ObjectClosedException) || (unwrap instanceof CancellationException);
    }

    private Duration getReadTimeout() {
        long millis = this.config.getMaxReadTimeout().toMillis();
        long millis2 = this.config.getMinReadTimeout().toMillis();
        long j = millis;
        Iterator<ProcessorCollection> it = this.processors.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ProcessorCollection next = it.next();
            if (next.mustFlush()) {
                j = 0;
                break;
            }
            j = MathHelpers.minMax(this.config.getFlushThresholdTime().minus(next.getElapsedSinceLastFlush()).toMillis(), millis2, j);
        }
        return Duration.ofMillis(j);
    }

    private Duration getIterationStartDelay() {
        return this.state.getLastIterationError() ? this.config.getErrorSleepDuration() : Duration.ZERO;
    }

    private void logStageEvent(String str, Object obj) {
        if (obj == null) {
            log.debug("{}: Iteration[{}].{}.", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), str});
        } else {
            log.debug("{}: Iteration[{}].{} ({}).", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), str, obj});
        }
    }

    private void logError(Throwable th, boolean z) {
        Throwable unwrap = Exceptions.unwrap(th);
        if (z) {
            log.error("{}: Iteration[{}].CriticalError.", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), unwrap});
        } else {
            log.error("{}: Iteration[{}].Error.", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), unwrap});
        }
    }

    private void logErrorHandled(Throwable th) {
        log.warn("{}: Iteration[{}].HandledError {}", new Object[]{this.traceObjectId, Long.valueOf(this.state.getIterationId()), Exceptions.unwrap(th).toString()});
    }

    private void checkRunning() {
        if (!canRun()) {
            throw new CancellationException("StorageWriter has been stopped.");
        }
    }
}
