package alluxio.master.journal.ufs;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.RuntimeConstants;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.InvalidJournalEntryException;
import alluxio.master.journal.JournalWriter;
import alluxio.proto.journal.Journal;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/ufs/UfsJournalLogWriter.class */
public final class UfsJournalLogWriter implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalLogWriter.class);
    private final UfsJournal mJournal;
    private final UnderFileSystem mUfs;
    private long mNextSequenceNumber;
    private JournalOutputStream mJournalOutputStream;
    private UfsJournalGarbageCollector mGarbageCollector;
    private boolean mClosed;
    private Queue<Journal.JournalEntry> mEntriesToFlush;
    private boolean mNeedsRecovery = false;
    private final long mMaxLogSize = Configuration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
    private boolean mRotateLogForNextWrite = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:alluxio/master/journal/ufs/UfsJournalLogWriter$JournalOutputStream.class */
    public class JournalOutputStream implements Closeable {
        final DataOutputStream mOutputStream;
        final UfsJournalFile mCurrentLog;

        JournalOutputStream(UfsJournalFile ufsJournalFile, OutputStream outputStream) {
            if (outputStream == null) {
                this.mOutputStream = null;
            } else if (outputStream instanceof DataOutputStream) {
                this.mOutputStream = (DataOutputStream) outputStream;
            } else {
                this.mOutputStream = new DataOutputStream(outputStream);
            }
            this.mCurrentLog = ufsJournalFile;
        }

        long bytesWritten() {
            if (this.mOutputStream == null) {
                return 0L;
            }
            return this.mOutputStream.size();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.mOutputStream != null) {
                this.mOutputStream.close();
            }
            UfsJournalLogWriter.LOG.info("Marking {} as complete with log entries within [{}, {}).", new Object[]{this.mCurrentLog.getLocation(), Long.valueOf(this.mCurrentLog.getStart()), Long.valueOf(UfsJournalLogWriter.this.mNextSequenceNumber)});
            String uri = this.mCurrentLog.getLocation().toString();
            if (UfsJournalLogWriter.this.mUfs.exists(uri) || UfsJournalLogWriter.this.mNextSequenceNumber != this.mCurrentLog.getStart()) {
                if (UfsJournalLogWriter.this.mNextSequenceNumber == this.mCurrentLog.getStart()) {
                    UfsJournalLogWriter.this.mUfs.deleteFile(uri);
                    return;
                }
                String uri2 = UfsJournalFile.encodeLogFileLocation(UfsJournalLogWriter.this.mJournal, this.mCurrentLog.getStart(), UfsJournalLogWriter.this.mNextSequenceNumber).toString();
                if (UfsJournalLogWriter.this.mUfs.exists(uri2)) {
                    UfsJournalLogWriter.LOG.warn("Deleting duplicate completed log {}.", uri2);
                    UfsJournalLogWriter.this.mUfs.deleteFile(uri2);
                }
                UfsJournalLogWriter.this.mUfs.renameFile(uri, uri2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UfsJournalLogWriter(UfsJournal ufsJournal, long j) throws IOException {
        this.mJournal = (UfsJournal) Preconditions.checkNotNull(ufsJournal, "journal");
        this.mUfs = this.mJournal.getUfs();
        this.mNextSequenceNumber = j;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            this.mJournalOutputStream = new JournalOutputStream(currentLog, null);
        }
        this.mGarbageCollector = new UfsJournalGarbageCollector(this.mJournal);
        this.mEntriesToFlush = new ArrayDeque();
    }

    @Override // alluxio.master.journal.JournalWriter
    public synchronized void write(Journal.JournalEntry journalEntry) throws IOException {
        if (this.mClosed) {
            throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage(new Object[0]));
        }
        maybeRecoverFromUfsFailures();
        maybeRotateLog();
        try {
            Journal.JournalEntry build = journalEntry.toBuilder().setSequenceNumber(this.mNextSequenceNumber).build();
            build.writeDelimitedTo(this.mJournalOutputStream.mOutputStream);
            LOG.debug("Adding journal entry (seq={}) to retryList with {} entries.", Long.valueOf(build.getSequenceNumber()), Integer.valueOf(this.mEntriesToFlush.size()));
            this.mEntriesToFlush.add(build);
            this.mNextSequenceNumber++;
        } catch (IOException e) {
            this.mNeedsRecovery = true;
            throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.mCurrentLog, e.getMessage()}), e);
        }
    }

    private void maybeRecoverFromUfsFailures() throws IOException {
        if (this.mNeedsRecovery) {
            long lastPersistedJournalEntrySequence = getLastPersistedJournalEntrySequence();
            createNewLogFile();
            if (!this.mEntriesToFlush.isEmpty()) {
                Journal.JournalEntry peek = this.mEntriesToFlush.peek();
                if (peek.getSequenceNumber() > lastPersistedJournalEntrySequence + 1) {
                    throw new RuntimeException(ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{Long.valueOf(lastPersistedJournalEntrySequence + 1), Long.valueOf(peek.getSequenceNumber())}));
                }
                long j = lastPersistedJournalEntrySequence;
                LOG.info("Retry writing unwritten journal entries from seq {}", Long.valueOf(lastPersistedJournalEntrySequence + 1));
                for (Journal.JournalEntry journalEntry : this.mEntriesToFlush) {
                    if (journalEntry.getSequenceNumber() > lastPersistedJournalEntrySequence) {
                        try {
                            journalEntry.toBuilder().build().writeDelimitedTo(this.mJournalOutputStream.mOutputStream);
                            j = journalEntry.getSequenceNumber();
                        } catch (IOException e) {
                            throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.mCurrentLog, e.getMessage()}), e);
                        }
                    }
                }
                LOG.info("Finished writing unwritten journal entries from {} to {}.", Long.valueOf(lastPersistedJournalEntrySequence + 1), Long.valueOf(j));
            }
            this.mNeedsRecovery = false;
        }
    }

    private long getLastPersistedJournalEntrySequence() throws IOException {
        UfsJournalSnapshot.getSnapshot(this.mJournal);
        long j = -1;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            long start = currentLog.getStart();
            LOG.info("Recovering from previous UFS journal write failure. Scanning for the last persisted journal entry.");
            try {
                UfsJournalReader ufsJournalReader = new UfsJournalReader(this.mJournal, start, true);
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            Journal.JournalEntry read = ufsJournalReader.read();
                            if (read == null) {
                                break;
                            }
                            if (read.getSequenceNumber() > j) {
                                j = read.getSequenceNumber();
                            }
                        } finally {
                        }
                    } catch (Throwable th2) {
                        if (ufsJournalReader != null) {
                            if (th != null) {
                                try {
                                    ufsJournalReader.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                ufsJournalReader.close();
                            }
                        }
                        throw th2;
                    }
                }
                if (ufsJournalReader != null) {
                    if (0 != 0) {
                        try {
                            ufsJournalReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        ufsJournalReader.close();
                    }
                }
            } catch (IOException e) {
                throw e;
            } catch (InvalidJournalEntryException e2) {
                LOG.info("Found last persisted journal entry with seq={}.", -1L);
            }
            String uri = currentLog.getLocation().toString();
            if (j >= 0) {
                LOG.info("Last persisted journal entry sequence number in {} is {}", uri, Long.valueOf(j));
                String uri2 = UfsJournalFile.encodeLogFileLocation(this.mJournal, currentLog.getStart(), j + 1).toString();
                if (this.mUfs.exists(uri2)) {
                    LOG.warn("Deleting duplicate completed log {}.", uri2);
                    this.mUfs.deleteFile(uri2);
                }
                LOG.info("Renaming the previous incomplete journal file from {} to {}.", uri, uri2);
                this.mUfs.renameFile(uri, uri2);
            } else {
                LOG.info("No journal entry found in current journal file {}. Deleting it", uri);
                this.mUfs.deleteFile(uri);
            }
        }
        if (j < 0) {
            List<UfsJournalFile> logs = UfsJournalSnapshot.getSnapshot(this.mJournal).getLogs();
            if (!logs.isEmpty()) {
                UfsJournalFile ufsJournalFile = logs.get(logs.size() - 1);
                j = ufsJournalFile.getEnd() - 1;
                LOG.info("Found last persisted journal entry with seq {} in {}.", Long.valueOf(j), ufsJournalFile.getLocation().toString());
            }
        }
        return j;
    }

    private void maybeRotateLog() throws IOException {
        if (this.mRotateLogForNextWrite) {
            if (this.mJournalOutputStream != null) {
                this.mJournalOutputStream.close();
                this.mJournalOutputStream = null;
            }
            createNewLogFile();
            this.mRotateLogForNextWrite = false;
        }
    }

    private void createNewLogFile() throws IOException {
        UfsJournalFile createLogFile = UfsJournalFile.createLogFile(UfsJournalFile.encodeLogFileLocation(this.mJournal, this.mNextSequenceNumber, UfsJournal.UNKNOWN_SEQUENCE_NUMBER), this.mNextSequenceNumber, UfsJournal.UNKNOWN_SEQUENCE_NUMBER);
        this.mJournalOutputStream = new JournalOutputStream(createLogFile, this.mUfs.create(createLogFile.getLocation().toString(), CreateOptions.defaults().setEnsureAtomic(false).setCreateParent(true)));
        LOG.info("Created current log file: {}", createLogFile);
    }

    @Override // alluxio.master.journal.JournalWriter
    public synchronized void flush() throws IOException {
        maybeRecoverFromUfsFailures();
        if (this.mClosed || this.mJournalOutputStream == null || this.mJournalOutputStream.bytesWritten() == 0) {
            return;
        }
        try {
            this.mJournalOutputStream.mOutputStream.flush();
            this.mEntriesToFlush.clear();
            boolean z = this.mJournalOutputStream.bytesWritten() >= this.mMaxLogSize;
            if (z || !this.mUfs.supportsFlush()) {
                if (z) {
                    LOG.info("Rotating log file. size: {} maxSize: {}", Long.valueOf(this.mJournalOutputStream.bytesWritten()), Long.valueOf(this.mMaxLogSize));
                }
                this.mRotateLogForNextWrite = true;
            }
        } catch (IOException e) {
            this.mRotateLogForNextWrite = true;
            UfsJournalFile ufsJournalFile = this.mJournalOutputStream.mCurrentLog;
            this.mJournalOutputStream = null;
            throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{ufsJournalFile, e.getMessage()}), e);
        }
    }

    public synchronized void close() throws IOException {
        Closer create = Closer.create();
        if (this.mJournalOutputStream != null) {
            create.register(this.mJournalOutputStream);
        }
        create.register(this.mGarbageCollector);
        create.close();
        this.mClosed = true;
    }

    @VisibleForTesting
    synchronized JournalOutputStream getJournalOutputStream() {
        return this.mJournalOutputStream;
    }
}
