package alluxio.master.journal.raft;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.JournalClosedException;
import alluxio.master.journal.JournalWriter;
import alluxio.proto.journal.Journal;
import alluxio.util.FormatUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/master/journal/raft/RaftJournalWriter.class */
public class RaftJournalWriter implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalWriter.class);
    private static final long MASTER_EMBEDDED_JOURNAL_WRITE_TIMEOUT = Configuration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_WRITE_TIMEOUT);
    private static final long MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX = Configuration.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX);
    private static final long FLUSH_BATCH_SIZE = MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX / 3;
    private final AtomicLong mNextSequenceNumberToWrite;
    private final RaftJournalAppender mClient;
    private Journal.JournalEntry.Builder mJournalEntryBuilder;
    private final AtomicLong mLastSubmittedSequenceNumber = new AtomicLong(-1);
    private final AtomicLong mLastCommittedSequenceNumber = new AtomicLong(-1);
    private volatile boolean mClosed = false;
    private final AtomicLong mCurrentJournalEntrySize = new AtomicLong(0);

    public RaftJournalWriter(long j, RaftJournalAppender raftJournalAppender) {
        LOG.debug("Journal writer created starting at SN#{}", Long.valueOf(j));
        this.mNextSequenceNumberToWrite = new AtomicLong(j);
        this.mClient = (RaftJournalAppender) Objects.requireNonNull(raftJournalAppender);
    }

    @Override // alluxio.master.journal.JournalWriter
    public void write(Journal.JournalEntry journalEntry) throws IOException, JournalClosedException {
        if (this.mClosed) {
            throw new JournalClosedException("Cannot write to journal. Journal writer has been closed");
        }
        Preconditions.checkState(journalEntry.getAllFields().size() <= 2, "Raft journal entries should never set multiple fields, but found %s", journalEntry);
        if (this.mCurrentJournalEntrySize.get() > FLUSH_BATCH_SIZE) {
            flush();
        }
        if (this.mJournalEntryBuilder == null) {
            this.mJournalEntryBuilder = Journal.JournalEntry.newBuilder();
            this.mCurrentJournalEntrySize.set(0L);
        }
        LOG.trace("Writing entry {}: {}", this.mNextSequenceNumberToWrite, journalEntry);
        this.mJournalEntryBuilder.addJournalEntries(journalEntry.toBuilder().setSequenceNumber(this.mNextSequenceNumberToWrite.getAndIncrement()).build());
        long serializedSize = journalEntry.getSerializedSize();
        if (serializedSize > MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX) {
            LOG.error("Journal entry size ({}) is bigger than the max allowed size ({}) defined by {}", new Object[]{FormatUtils.getSizeFromBytes(serializedSize), FormatUtils.getSizeFromBytes(MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX), PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX.getName()});
        }
        this.mCurrentJournalEntrySize.addAndGet(serializedSize);
    }

    @Override // alluxio.master.journal.JournalWriter
    public void flush() throws IOException, JournalClosedException {
        if (this.mClosed) {
            throw new JournalClosedException("Cannot flush. Journal writer has been closed");
        }
        if (this.mJournalEntryBuilder != null) {
            long j = this.mNextSequenceNumberToWrite.get() - 1;
            try {
                Journal.JournalEntry build = this.mJournalEntryBuilder.build();
                Message valueOf = Message.valueOf(UnsafeByteOperations.unsafeWrap(build.toByteArray()));
                this.mLastSubmittedSequenceNumber.set(j);
                LOG.trace("Flushing entry {} ({})", build, valueOf);
                RaftClientReply raftClientReply = this.mClient.sendAsync(valueOf).get(MASTER_EMBEDDED_JOURNAL_WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
                if (raftClientReply.getException() != null) {
                    throw raftClientReply.getException();
                }
                this.mLastCommittedSequenceNumber.set(j);
                this.mJournalEntryBuilder = null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            } catch (ExecutionException e2) {
                throw new IOException(e2.getCause());
            } catch (TimeoutException e3) {
                throw new IOException(String.format("Timed out after waiting %s milliseconds for journal entries to be processed", Long.valueOf(MASTER_EMBEDDED_JOURNAL_WRITE_TIMEOUT)), e3);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        LOG.info("Closing journal writer. Last sequence numbers written/submitted/committed: {}/{}/{}", new Object[]{Long.valueOf(this.mNextSequenceNumberToWrite.get() - 1), Long.valueOf(this.mLastSubmittedSequenceNumber.get()), Long.valueOf(this.mLastCommittedSequenceNumber.get())});
        closeClient();
    }

    public long getNextSequenceNumberToWrite() {
        return this.mNextSequenceNumberToWrite.get();
    }

    private void closeClient() {
        try {
            this.mClient.close();
        } catch (IOException e) {
            LOG.warn("Failed to close raft client: {}", e.toString());
        }
    }
}
