package io.pravega.segmentstore.server.logs;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.util.SortedIndex;
import io.pravega.segmentstore.server.LogItem;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.LogAddress;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/server/logs/DataFrameBuilder.class */
class DataFrameBuilder<T extends LogItem> implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(DataFrameBuilder.class);
    private final DataFrameOutputStream outputStream;
    private final DurableDataLog targetLog;
    private final Args args;
    private final AtomicBoolean closed;
    private long lastSerializedSequenceNumber;
    private long lastStartedSequenceNumber;
    private final AtomicReference<Throwable> failureCause;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/segmentstore/server/logs/DataFrameBuilder$Args.class */
    public static class Args {
        final Consumer<CommitArgs> beforeCommit;
        final Consumer<CommitArgs> commitSuccess;
        final BiConsumer<Throwable, CommitArgs> commitFailure;
        final Executor executor;
        final Duration writeTimeout = Duration.ofSeconds(30);

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"beforeCommit", "commitSuccess", "commitFailure", "executor"})
        public Args(Consumer<CommitArgs> consumer, Consumer<CommitArgs> consumer2, BiConsumer<Throwable, CommitArgs> biConsumer, Executor executor) {
            this.beforeCommit = consumer;
            this.commitSuccess = consumer2;
            this.commitFailure = biConsumer;
            this.executor = executor;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/segmentstore/server/logs/DataFrameBuilder$CommitArgs.class */
    public static class CommitArgs implements SortedIndex.IndexEntry {
        private final long lastFullySerializedSequenceNumber;
        private final long lastStartedSequenceNumber;
        private final AtomicReference<LogAddress> logAddress;
        private final int dataFrameLength;
        private long indexKey;
        static final /* synthetic */ boolean $assertionsDisabled;

        private CommitArgs(long j, long j2, int i) {
            if (!$assertionsDisabled && j > j2) {
                throw new AssertionError("lastFullySerializedSequenceNumber (" + j + ") is greater than lastStartedSequenceNumber (" + j2 + ")");
            }
            this.lastFullySerializedSequenceNumber = j;
            this.lastStartedSequenceNumber = j2;
            this.dataFrameLength = i;
            this.logAddress = new AtomicReference<>();
        }

        public long key() {
            return this.indexKey;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public LogAddress getLogAddress() {
            return this.logAddress.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setLogAddress(LogAddress logAddress) {
            this.logAddress.set(logAddress);
        }

        public String toString() {
            return String.format("LastFullySerializedSN = %d, LastStartedSN = %d, Address = %s, Length = %d", Long.valueOf(getLastFullySerializedSequenceNumber()), Long.valueOf(getLastStartedSequenceNumber()), this.logAddress, Integer.valueOf(getDataFrameLength()));
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getLastFullySerializedSequenceNumber() {
            return this.lastFullySerializedSequenceNumber;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getLastStartedSequenceNumber() {
            return this.lastStartedSequenceNumber;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int getDataFrameLength() {
            return this.dataFrameLength;
        }

        @SuppressFBWarnings(justification = "generated code")
        public void setIndexKey(long j) {
            this.indexKey = j;
        }

        static {
            $assertionsDisabled = !DataFrameBuilder.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataFrameBuilder(DurableDataLog durableDataLog, Args args) {
        this.targetLog = (DurableDataLog) Preconditions.checkNotNull(durableDataLog, "targetLog");
        this.args = (Args) Preconditions.checkNotNull(args, "args");
        Preconditions.checkNotNull(args.commitSuccess, "args.commitSuccess");
        Preconditions.checkNotNull(args.commitFailure, "args.commitFailure");
        this.outputStream = new DataFrameOutputStream(durableDataLog.getMaxAppendLength(), this::handleDataFrameComplete);
        this.lastSerializedSequenceNumber = -1L;
        this.lastStartedSequenceNumber = -1L;
        this.failureCause = new AtomicReference<>();
        this.closed = new AtomicBoolean();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            return;
        }
        this.outputStream.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        this.outputStream.flush();
        this.outputStream.releaseBuffer();
    }

    Throwable failureCause() {
        return this.failureCause.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void append(T t) throws IOException {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long sequenceNumber = t.getSequenceNumber();
        Exceptions.checkArgument(this.lastSerializedSequenceNumber < sequenceNumber, "logItem", "Invalid sequence number. Expected: greater than %d, given: %d.", new Object[]{Long.valueOf(this.lastSerializedSequenceNumber), Long.valueOf(sequenceNumber)});
        long j = this.lastStartedSequenceNumber;
        try {
            this.outputStream.startNewRecord();
            this.lastStartedSequenceNumber = sequenceNumber;
            t.serialize(this.outputStream);
            this.outputStream.endRecord();
            this.lastSerializedSequenceNumber = sequenceNumber;
        } catch (Exception e) {
            if (this.closed.get()) {
                throw new ObjectClosedException(this, e);
            }
            if (e instanceof ObjectClosedException) {
                close();
            } else {
                this.outputStream.discardRecord();
                this.lastStartedSequenceNumber = j;
            }
            throw e;
        }
    }

    private void handleDataFrameComplete(DataFrame dataFrame) {
        Exceptions.checkArgument(dataFrame.isSealed(), "dataFrame", "Cannot publish a non-sealed DataFrame.", new Object[0]);
        CommitArgs commitArgs = new CommitArgs(this.lastSerializedSequenceNumber, this.lastStartedSequenceNumber, dataFrame.getLength());
        try {
            this.args.beforeCommit.accept(commitArgs);
            this.targetLog.append(dataFrame.getData(), this.args.writeTimeout).thenAcceptAsync(logAddress -> {
                commitArgs.setLogAddress(logAddress);
                this.args.commitSuccess.accept(commitArgs);
            }, this.args.executor).exceptionally(th -> {
                return handleProcessingException(th, commitArgs);
            });
        } catch (Throwable th2) {
            handleProcessingException(th2, commitArgs);
            throw th2;
        }
    }

    private Void handleProcessingException(Throwable th, CommitArgs commitArgs) {
        Throwable unwrap = Exceptions.unwrap(th);
        if (!isShutdownException(unwrap)) {
            this.failureCause.compareAndSet(null, unwrap);
        }
        this.args.commitFailure.accept(unwrap, commitArgs);
        close();
        return null;
    }

    private boolean isShutdownException(Throwable th) {
        return (th instanceof ObjectClosedException) || (th instanceof CancellationException);
    }
}
