package io.pravega.segmentstore.server.logs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.Services;
import io.pravega.common.util.SequencedItemList;
import io.pravega.segmentstore.contracts.ContainerException;
import io.pravega.segmentstore.contracts.StreamSegmentException;
import io.pravega.segmentstore.contracts.StreamingException;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.LogItemFactory;
import io.pravega.segmentstore.server.OperationLog;
import io.pravega.segmentstore.server.ReadIndex;
import io.pravega.segmentstore.server.UpdateableContainerMetadata;
import io.pravega.segmentstore.server.logs.DataFrameReader;
import io.pravega.segmentstore.server.logs.operations.MetadataCheckpointOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.OperationFactory;
import io.pravega.segmentstore.server.logs.operations.ProbeOperation;
import io.pravega.segmentstore.server.logs.operations.StorageMetadataCheckpointOperation;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.DurableDataLogFactory;
import io.pravega.segmentstore.storage.LogAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/server/logs/DurableLog.class */
public class DurableLog extends AbstractService implements OperationLog {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private static final Duration RECOVERY_TIMEOUT;
    private final String traceObjectId;
    private final LogItemFactory<Operation> operationFactory;
    private final SequencedItemList<Operation> inMemoryOperationLog;
    private final DurableDataLog durableDataLog;
    private final MemoryStateUpdater memoryStateUpdater;
    private final OperationProcessor operationProcessor;
    private final UpdateableContainerMetadata metadata;

    @GuardedBy("tailReads")
    private final Set<TailRead> tailReads;
    private final ScheduledExecutorService executor;
    private final AtomicReference<Throwable> stopException = new AtomicReference<>();
    private final AtomicBoolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/logs/DurableLog$TailRead.class */
    public static class TailRead {
        final long afterSequenceNumber;
        final int maxCount;
        final CompletableFuture<Iterator<Operation>> future;

        TailRead(long j, int i, Duration duration, ScheduledExecutorService scheduledExecutorService) {
            this.afterSequenceNumber = j;
            this.maxCount = i;
            this.future = Futures.futureWithTimeout(duration, scheduledExecutorService);
        }

        public String toString() {
            return String.format("SeqNo = %d, Count = %d", Long.valueOf(this.afterSequenceNumber), Integer.valueOf(this.maxCount));
        }
    }

    public DurableLog(DurableLogConfig durableLogConfig, UpdateableContainerMetadata updateableContainerMetadata, DurableDataLogFactory durableDataLogFactory, ReadIndex readIndex, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(durableLogConfig, "config");
        Preconditions.checkNotNull(updateableContainerMetadata, "metadata");
        Preconditions.checkNotNull(durableDataLogFactory, "dataFrameLogFactory");
        Preconditions.checkNotNull(readIndex, "readIndex");
        Preconditions.checkNotNull(scheduledExecutorService, "executor");
        this.durableDataLog = durableDataLogFactory.createDurableDataLog(updateableContainerMetadata.getContainerId());
        if (!$assertionsDisabled && this.durableDataLog == null) {
            throw new AssertionError("dataFrameLogFactory created null durableDataLog.");
        }
        this.traceObjectId = String.format("DurableLog[%s]", Integer.valueOf(updateableContainerMetadata.getContainerId()));
        this.metadata = updateableContainerMetadata;
        this.executor = scheduledExecutorService;
        this.operationFactory = new OperationFactory();
        this.inMemoryOperationLog = createInMemoryLog();
        this.memoryStateUpdater = new MemoryStateUpdater(this.inMemoryOperationLog, readIndex, this::triggerTailReads);
        this.operationProcessor = new OperationProcessor(this.metadata, this.memoryStateUpdater, this.durableDataLog, new MetadataCheckpointPolicy(durableLogConfig, this::queueMetadataCheckpoint, this.executor), scheduledExecutorService);
        Services.onStop(this.operationProcessor, this::queueStoppedHandler, this::queueFailedHandler, this.executor);
        this.tailReads = new HashSet();
        this.closed = new AtomicBoolean();
    }

    @VisibleForTesting
    protected SequencedItemList<Operation> createInMemoryLog() {
        return new SequencedItemList<>();
    }

    @Override // io.pravega.segmentstore.server.Container, java.lang.AutoCloseable
    public void close() {
        if (this.closed.get()) {
            return;
        }
        Futures.await(Services.stopAsync(this, this.executor));
        this.operationProcessor.close();
        this.durableDataLog.close();
        log.info("{}: Closed.", this.traceObjectId);
        this.closed.set(true);
    }

    protected void doStart() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "doStart", new Object[0]);
        log.info("{}: Starting.", this.traceObjectId);
        CompletableFuture.supplyAsync(this::performRecovery, this.executor).thenCompose(bool -> {
            return Services.startAsync(this.operationProcessor, this.executor).thenComposeAsync(r4 -> {
                return bool.booleanValue() ? CompletableFuture.completedFuture(null) : queueMetadataCheckpoint();
            }, (Executor) this.executor);
        }).thenRunAsync(() -> {
            log.info("{}: Started.", this.traceObjectId);
            notifyStarted();
            LoggerHelpers.traceLeave(log, this.traceObjectId, "doStart", traceEnterWithContext, new Object[0]);
        }, (Executor) this.executor).exceptionally(th -> {
            if (this.operationProcessor.isRunning()) {
                this.operationProcessor.stopAsync();
            }
            notifyFailed(Exceptions.unwrap(th));
            return null;
        });
    }

    protected void doStop() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "doStop", new Object[0]);
        log.info("{}: Stopping.", this.traceObjectId);
        Services.stopAsync(this.operationProcessor, this.executor).whenCompleteAsync((r10, th) -> {
            cancelTailReads();
            this.durableDataLog.close();
            Throwable th = this.stopException.get();
            if (th == null && this.operationProcessor.state() == Service.State.FAILED) {
                th = this.operationProcessor.failureCause();
            }
            if (th == null) {
                notifyStopped();
            } else {
                notifyFailed(th);
            }
            log.info("{}: Stopped.", this.traceObjectId);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "doStop", traceEnterWithContext, new Object[0]);
        }, (Executor) this.executor).exceptionally(th2 -> {
            notifyFailed(th2);
            return null;
        });
    }

    @Override // io.pravega.segmentstore.server.Container
    public int getId() {
        return this.metadata.getContainerId();
    }

    @Override // io.pravega.segmentstore.server.OperationLog
    public CompletableFuture<Void> add(Operation operation, Duration duration) {
        ensureRunning();
        return this.operationProcessor.process(operation);
    }

    @Override // io.pravega.segmentstore.server.OperationLog
    public CompletableFuture<Void> truncate(long j, Duration duration) {
        ensureRunning();
        Preconditions.checkArgument(this.metadata.isValidTruncationPoint(j), "Invalid Truncation Point. Must refer to a MetadataCheckpointOperation.");
        long j2 = j - 1;
        LogAddress closestTruncationMarker = this.metadata.getClosestTruncationMarker(j2);
        if (closestTruncationMarker == null) {
            return CompletableFuture.completedFuture(null);
        }
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        log.info("{}: Truncate (OperationSequenceNumber = {}, DataFrameAddress = {}).", new Object[]{this.traceObjectId, Long.valueOf(j), closestTruncationMarker});
        return add(new StorageMetadataCheckpointOperation(), timeoutTimer.getRemaining()).thenComposeAsync(r7 -> {
            return this.durableDataLog.truncate(closestTruncationMarker, timeoutTimer.getRemaining());
        }, (Executor) this.executor).thenRunAsync(() -> {
            int truncate = this.inMemoryOperationLog.truncate(j2);
            this.metadata.removeTruncationMarkers(j2);
            this.operationProcessor.getMetrics().operationLogTruncate(truncate);
        }, (Executor) this.executor);
    }

    @Override // io.pravega.segmentstore.server.OperationLog
    public CompletableFuture<Iterator<Operation>> read(long j, int i, Duration duration) {
        Operation operation;
        ensureRunning();
        log.debug("{}: Read (AfterSequenceNumber = {}, MaxCount = {}).", new Object[]{this.traceObjectId, Long.valueOf(j), Integer.valueOf(i)});
        Iterator read = this.inMemoryOperationLog.read(j, i);
        if (read.hasNext()) {
            return CompletableFuture.completedFuture(read);
        }
        CompletableFuture<Iterator<Operation>> completableFuture = null;
        synchronized (this.tailReads) {
            operation = (Operation) this.inMemoryOperationLog.getLast();
            if (operation == null || operation.getSequenceNumber() <= j) {
                TailRead tailRead = new TailRead(j, i, duration, this.executor);
                completableFuture = tailRead.future;
                this.tailReads.add(tailRead);
                completableFuture.whenComplete((it, th) -> {
                    unregisterTailRead(tailRead);
                });
            }
        }
        if (completableFuture == null) {
            Iterator read2 = this.inMemoryOperationLog.read(j, i);
            if (!$assertionsDisabled && !read2.hasNext()) {
                Object[] objArr = new Object[2];
                objArr[0] = Long.valueOf(j);
                objArr[1] = Long.valueOf(operation == null ? -1L : operation.getSequenceNumber());
                throw new AssertionError(String.format("Unable to read anything after SeqNo %d, even though last operation SeqNo == %d", objArr));
            }
            completableFuture = CompletableFuture.completedFuture(read2);
        }
        return completableFuture;
    }

    @Override // io.pravega.segmentstore.server.OperationLog
    public CompletableFuture<Void> operationProcessingBarrier(Duration duration) {
        return add(new ProbeOperation(), duration).whenComplete((r8, th) -> {
            if (th != null) {
                log.warn("{}: Error caught while waiting for {}: {}.", new Object[]{this.traceObjectId, ProbeOperation.class.getSimpleName(), th});
            }
        });
    }

    private boolean performRecovery() {
        Preconditions.checkState(state() == Service.State.STARTING, "Cannot perform recovery if the DurableLog is not in a '%s' state.", Service.State.STARTING);
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "performRecovery", new Object[0]);
        Timer timer = new Timer();
        log.info("{} Recovery started.", this.traceObjectId);
        this.metadata.enterRecoveryMode();
        this.metadata.reset();
        this.operationProcessor.getMetrics().operationLogInit();
        OperationMetadataUpdater operationMetadataUpdater = new OperationMetadataUpdater(this.metadata);
        this.memoryStateUpdater.enterRecoveryMode(operationMetadataUpdater);
        boolean z = false;
        try {
            try {
                this.durableDataLog.initialize(RECOVERY_TIMEOUT);
                int recoverFromDataFrameLog = recoverFromDataFrameLog(operationMetadataUpdater);
                this.metadata.setContainerEpoch(this.durableDataLog.getEpoch());
                log.info("{} Recovery completed. Epoch = {}, Items Recovered = {}, Time = {}ms.", new Object[]{this.traceObjectId, Long.valueOf(this.metadata.getContainerEpoch()), Integer.valueOf(recoverFromDataFrameLog), Long.valueOf(timer.getElapsedMillis())});
                z = true;
                this.metadata.exitRecoveryMode();
                this.memoryStateUpdater.exitRecoveryMode(true);
                this.operationProcessor.getMetrics().operationsCompleted(recoverFromDataFrameLog, timer.getElapsed());
                LoggerHelpers.traceLeave(log, this.traceObjectId, "performRecovery", traceEnterWithContext, new Object[0]);
                return recoverFromDataFrameLog > 0;
            } catch (Throwable th) {
                this.metadata.exitRecoveryMode();
                this.memoryStateUpdater.exitRecoveryMode(z);
                throw th;
            }
        } catch (Exception e) {
            log.error("{} Recovery FAILED.", this.traceObjectId, e);
            throw new CompletionException(e);
        }
    }

    private int recoverFromDataFrameLog(OperationMetadataUpdater operationMetadataUpdater) throws Exception {
        DataFrameReader.ReadResult<Operation> m17getNext;
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "recoverFromDataFrameLog", new Object[0]);
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        DataFrameReader dataFrameReader = new DataFrameReader(this.durableDataLog, this.operationFactory, getId());
        Throwable th = null;
        while (true) {
            try {
                m17getNext = dataFrameReader.m17getNext();
                if (m17getNext == null) {
                    log.warn("{}: Reached the end of the DataFrameLog and could not find any MetadataCheckpointOperations after reading {} Operations and {} Data Frames.", new Object[]{this.traceObjectId, Integer.valueOf(i), Integer.valueOf(i2)});
                    break;
                }
                if (m17getNext.getItem() instanceof MetadataCheckpointOperation) {
                    log.info("{}: Starting recovery from Sequence Number {} (skipped {} Operations and {} Data Frames).", new Object[]{this.traceObjectId, Long.valueOf(m17getNext.getItem().getSequenceNumber()), Integer.valueOf(i), Integer.valueOf(i2)});
                    break;
                }
                if (m17getNext.isLastFrameEntry()) {
                    i2++;
                }
                i++;
                log.debug("{}: Not recovering operation because no MetadataCheckpointOperation encountered so far ({}).", this.traceObjectId, m17getNext.getItem());
            } finally {
                if (dataFrameReader != null) {
                    if (0 != 0) {
                        try {
                            dataFrameReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        dataFrameReader.close();
                    }
                }
            }
        }
        while (m17getNext != null) {
            recordTruncationMarker(m17getNext);
            recoverOperation(m17getNext.getItem(), operationMetadataUpdater);
            i3++;
            m17getNext = dataFrameReader.m17getNext();
        }
        operationMetadataUpdater.commitAll();
        LoggerHelpers.traceLeave(log, this.traceObjectId, "recoverFromDataFrameLog", traceEnterWithContext, new Object[]{Integer.valueOf(i3)});
        return i3;
    }

    private void recoverOperation(Operation operation, OperationMetadataUpdater operationMetadataUpdater) throws DataCorruptionException {
        operationMetadataUpdater.setOperationSequenceNumber(operation.getSequenceNumber());
        try {
            log.debug("{} Recovering {}.", this.traceObjectId, operation);
            operationMetadataUpdater.preProcessOperation(operation);
            operationMetadataUpdater.acceptOperation(operation);
            this.memoryStateUpdater.process(operation);
        } catch (StreamSegmentException | ContainerException e) {
            throw new DataCorruptionException(String.format("Unable to update metadata for Log Operation %s", operation), e);
        }
    }

    private void recordTruncationMarker(DataFrameReader.ReadResult<Operation> readResult) {
        LogAddress lastFullDataFrameAddress = readResult.getLastFullDataFrameAddress();
        LogAddress lastUsedDataFrameAddress = readResult.getLastUsedDataFrameAddress();
        if (lastFullDataFrameAddress != null && lastFullDataFrameAddress.getSequence() != lastUsedDataFrameAddress.getSequence()) {
            this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastFullDataFrameAddress);
        } else if (readResult.isLastFrameEntry()) {
            this.metadata.recordTruncationMarker(readResult.getItem().getSequenceNumber(), lastUsedDataFrameAddress);
        }
    }

    private void ensureRunning() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        if (state() != Service.State.RUNNING) {
            throw new IllegalContainerStateException(getId(), state(), Service.State.RUNNING);
        }
    }

    private void queueFailedHandler(Throwable th) {
        log.warn("{}: QueueProcessor failed with exception {}", this.traceObjectId, th);
        this.stopException.set(th);
        stopAsync();
    }

    private void queueStoppedHandler() {
        if (state() == Service.State.STOPPING || state() == Service.State.FAILED) {
            return;
        }
        log.warn("{}: QueueProcessor stopped unexpectedly (no error) but DurableLog was not currently stopping. Shutting down DurableLog.", this.traceObjectId);
        this.stopException.set(new StreamingException("QueueProcessor stopped unexpectedly (no error) but DurableLog was not currently stopping."));
        stopAsync();
    }

    private CompletableFuture<Void> queueMetadataCheckpoint() {
        log.info("{}: MetadataCheckpointOperation queued.", this.traceObjectId);
        return this.operationProcessor.process(new MetadataCheckpointOperation()).thenAccept(r5 -> {
            log.info("{}: MetadataCheckpointOperation durably stored.", this.traceObjectId);
        });
    }

    private void unregisterTailRead(TailRead tailRead) {
        synchronized (this.tailReads) {
            this.tailReads.remove(tailRead);
        }
        if (tailRead.future == null || tailRead.future.isDone()) {
            return;
        }
        tailRead.future.cancel(true);
    }

    private void triggerTailReads() {
        this.executor.execute(() -> {
            List<TailRead> emptyList;
            synchronized (this.tailReads) {
                Operation operation = (Operation) this.inMemoryOperationLog.getLast();
                if (operation != null) {
                    long sequenceNumber = operation.getSequenceNumber();
                    emptyList = (List) this.tailReads.stream().filter(tailRead -> {
                        return tailRead.afterSequenceNumber < sequenceNumber;
                    }).collect(Collectors.toList());
                } else {
                    emptyList = Collections.emptyList();
                }
            }
            for (TailRead tailRead2 : emptyList) {
                tailRead2.future.complete(Futures.runOrFail(() -> {
                    return this.inMemoryOperationLog.read(tailRead2.afterSequenceNumber, tailRead2.maxCount);
                }, tailRead2.future));
            }
        });
    }

    private void cancelTailReads() {
        ArrayList arrayList;
        synchronized (this.tailReads) {
            arrayList = new ArrayList(this.tailReads);
        }
        arrayList.forEach(this::unregisterTailRead);
    }

    static {
        $assertionsDisabled = !DurableLog.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DurableLog.class);
        RECOVERY_TIMEOUT = Duration.ofSeconds(30L);
    }
}
