package alluxio.master.journal.ufs;

import alluxio.RuntimeConstants;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JournalClosedException;
import alluxio.master.journal.JournalEntryStreamReader;
import alluxio.master.journal.JournalWriter;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.OpenOptions;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/ufs/UfsJournalLogWriter.class */
public final class UfsJournalLogWriter implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalLogWriter.class);
    private final UfsJournal mJournal;
    private final UnderFileSystem mUfs;
    private long mNextSequenceNumber;
    private JournalOutputStream mJournalOutputStream;
    private final UfsJournalGarbageCollector mGarbageCollector;
    private boolean mClosed;
    private final Queue<Journal.JournalEntry> mEntriesToFlush;
    private boolean mNeedsRecovery = false;
    private final long mMaxLogSize = Configuration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
    private boolean mRotateLogForNextWrite = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:alluxio/master/journal/ufs/UfsJournalLogWriter$JournalOutputStream.class */
    public class JournalOutputStream extends OutputStream {
        private final DataOutputStream mOutputStream;
        private final UfsJournalFile mCurrentLog;

        JournalOutputStream(UfsJournalFile ufsJournalFile, OutputStream outputStream) {
            this.mOutputStream = UfsJournalLogWriter.wrapDataOutputStream(outputStream);
            this.mCurrentLog = ufsJournalFile;
        }

        long bytesWritten() {
            if (this.mOutputStream == null) {
                return 0L;
            }
            return this.mOutputStream.size();
        }

        UfsJournalFile currentLog() {
            return this.mCurrentLog;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            checkJournalWriterOpen();
            this.mOutputStream.write(i);
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            checkJournalWriterOpen();
            this.mOutputStream.write(bArr);
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            checkJournalWriterOpen();
            this.mOutputStream.write(bArr, i, i2);
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            checkJournalWriterOpen();
            this.mOutputStream.flush();
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            checkJournalWriterOpen();
            this.mOutputStream.close();
            UfsJournalLogWriter.LOG.info("Marking {} as complete with log entries within [{}, {}).", new Object[]{this.mCurrentLog.getLocation(), Long.valueOf(this.mCurrentLog.getStart()), Long.valueOf(UfsJournalLogWriter.this.mNextSequenceNumber)});
            UfsJournalLogWriter.this.completeLog(this.mCurrentLog, UfsJournalLogWriter.this.mNextSequenceNumber);
        }

        private void checkJournalWriterOpen() throws JournalClosedException.IOJournalClosedException {
            if (UfsJournalLogWriter.this.mClosed) {
                throw new JournalClosedException("Journal writer is closed. currentLog: " + this.mCurrentLog).toIOException();
            }
        }
    }

    public UfsJournalLogWriter(UfsJournal ufsJournal, long j) throws IOException {
        this.mJournal = (UfsJournal) Preconditions.checkNotNull(ufsJournal, "journal");
        this.mUfs = this.mJournal.getUfs();
        this.mNextSequenceNumber = j;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            this.mJournalOutputStream = new JournalOutputStream(currentLog, ByteStreams.nullOutputStream());
        }
        this.mGarbageCollector = new UfsJournalGarbageCollector(this.mJournal);
        this.mEntriesToFlush = new ArrayDeque();
    }

    @Override // alluxio.master.journal.JournalWriter
    public synchronized void write(Journal.JournalEntry journalEntry) throws IOException, JournalClosedException {
        checkIsWritable();
        try {
            maybeRecoverFromUfsFailures();
            maybeRotateLog();
            try {
                Journal.JournalEntry build = journalEntry.toBuilder().setSequenceNumber(this.mNextSequenceNumber).build();
                build.writeDelimitedTo(this.mJournalOutputStream);
                LOG.debug("Adding journal entry (seq={}) to retryList with {} entries. currentLog: {}", new Object[]{Long.valueOf(build.getSequenceNumber()), Integer.valueOf(this.mEntriesToFlush.size()), currentLogName()});
                this.mEntriesToFlush.add(build);
                this.mNextSequenceNumber++;
            } catch (IOException e) {
                this.mNeedsRecovery = true;
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.currentLog(), e.getMessage()}), e);
            } catch (JournalClosedException.IOJournalClosedException e2) {
                throw e2.toJournalClosedException();
            }
        } catch (JournalClosedException.IOJournalClosedException e3) {
            throw e3.toJournalClosedException();
        }
    }

    private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedException {
        checkIsWritable();
        if (this.mNeedsRecovery) {
            Timer.Context time = MetricsSystem.timer(MetricKey.MASTER_UFS_JOURNAL_FAILURE_RECOVER_TIMER.getName()).time();
            Throwable th = null;
            try {
                long recoverLastPersistedJournalEntry = recoverLastPersistedJournalEntry();
                if (recoverLastPersistedJournalEntry == -1) {
                    throw new RuntimeException("Cannot find any journal entry to recover. location: " + this.mJournal.getLocation());
                }
                createNewLogFile(recoverLastPersistedJournalEntry + 1);
                if (!this.mEntriesToFlush.isEmpty()) {
                    Journal.JournalEntry peek = this.mEntriesToFlush.peek();
                    if (peek.getSequenceNumber() > recoverLastPersistedJournalEntry + 1) {
                        throw new RuntimeException(ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{Long.valueOf(recoverLastPersistedJournalEntry + 1), Long.valueOf(peek.getSequenceNumber())}));
                    }
                    long j = recoverLastPersistedJournalEntry;
                    LOG.info("Retry writing unwritten journal entries from seq {} to currentLog {}", Long.valueOf(recoverLastPersistedJournalEntry + 1), currentLogName());
                    for (Journal.JournalEntry journalEntry : this.mEntriesToFlush) {
                        if (journalEntry.getSequenceNumber() > recoverLastPersistedJournalEntry) {
                            try {
                                journalEntry.toBuilder().build().writeDelimitedTo(this.mJournalOutputStream);
                                j = journalEntry.getSequenceNumber();
                            } catch (IOException e) {
                                throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.currentLog(), e.getMessage()}), e);
                            } catch (JournalClosedException.IOJournalClosedException e2) {
                                throw e2.toJournalClosedException();
                            }
                        }
                    }
                    LOG.info("Finished writing unwritten journal entries from {} to {}. currentLog: {}", new Object[]{Long.valueOf(recoverLastPersistedJournalEntry + 1), Long.valueOf(j), currentLogName()});
                    if (j != this.mNextSequenceNumber - 1) {
                        throw new RuntimeException("Failed to recover all entries to flush, expecting " + (this.mNextSequenceNumber - 1) + " but only found entry " + j + " currentLog: " + currentLogName());
                    }
                }
                this.mNeedsRecovery = false;
            } finally {
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
            }
        }
    }

    private long recoverLastPersistedJournalEntry() throws IOException {
        UfsJournalSnapshot.getSnapshot(this.mJournal);
        long j = -1;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            LOG.info("Recovering from previous UFS journal write failure. Scanning for the last persisted journal entry. currentLog: " + currentLog);
            JournalEntryStreamReader journalEntryStreamReader = new JournalEntryStreamReader(this.mUfs.open(currentLog.getLocation().toString(), OpenOptions.defaults().setRecoverFailedOpen(true)));
            Throwable th = null;
            while (true) {
                try {
                    try {
                        Journal.JournalEntry readEntry = journalEntryStreamReader.readEntry();
                        if (readEntry == null) {
                            break;
                        }
                        if (readEntry.getSequenceNumber() > j) {
                            j = readEntry.getSequenceNumber();
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (journalEntryStreamReader != null) {
                        if (th != null) {
                            try {
                                journalEntryStreamReader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            journalEntryStreamReader.close();
                        }
                    }
                    throw th2;
                }
            }
            if (journalEntryStreamReader != null) {
                if (0 != 0) {
                    try {
                        journalEntryStreamReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    journalEntryStreamReader.close();
                }
            }
            if (j != -1) {
                completeLog(currentLog, j + 1);
            }
        }
        if (j < 0) {
            List<UfsJournalFile> logs = UfsJournalSnapshot.getSnapshot(this.mJournal).getLogs();
            if (!logs.isEmpty()) {
                int size = logs.size() - 1;
                while (true) {
                    if (size < 0) {
                        break;
                    }
                    UfsJournalFile ufsJournalFile = logs.get(size);
                    if (!ufsJournalFile.isIncompleteLog()) {
                        j = ufsJournalFile.getEnd() - 1;
                        LOG.info("Found last persisted journal entry with seq {} in {}.", Long.valueOf(j), ufsJournalFile.getLocation().toString());
                        break;
                    }
                    size--;
                }
            }
        }
        return j;
    }

    private void maybeRotateLog() throws IOException, JournalClosedException {
        checkIsWritable();
        if (this.mRotateLogForNextWrite) {
            if (this.mJournalOutputStream != null) {
                this.mJournalOutputStream.close();
                this.mJournalOutputStream = null;
            }
            createNewLogFile(this.mNextSequenceNumber);
            this.mRotateLogForNextWrite = false;
        }
    }

    private void createNewLogFile(long j) throws IOException, JournalClosedException {
        checkIsWritable();
        UfsJournalFile createLogFile = UfsJournalFile.createLogFile(UfsJournalFile.encodeLogFileLocation(this.mJournal, j, UfsJournal.UNKNOWN_SEQUENCE_NUMBER), j, UfsJournal.UNKNOWN_SEQUENCE_NUMBER);
        this.mJournalOutputStream = new JournalOutputStream(createLogFile, this.mUfs.create(createLogFile.getLocation().toString(), CreateOptions.defaults(Configuration.global()).setEnsureAtomic(false).setCreateParent(true)));
        LOG.info("Created current log file: {}", createLogFile);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeLog(UfsJournalFile ufsJournalFile, long j) throws IOException {
        try {
            checkIsWritable();
            String uri = ufsJournalFile.getLocation().toString();
            if (j <= ufsJournalFile.getStart()) {
                LOG.info("No journal entry found in current journal file {}. Deleting it", uri);
                if (this.mUfs.deleteFile(uri)) {
                    return;
                }
                LOG.warn("Failed to delete empty journal file {}", uri);
                return;
            }
            String uri2 = UfsJournalFile.encodeLogFileLocation(this.mJournal, ufsJournalFile.getStart(), j).toString();
            try {
                checkIsWritable();
                LOG.info(String.format("Completing log %s with next sequence number %d", uri, Long.valueOf(j)));
                if (this.mUfs.renameFile(uri, uri2)) {
                    return;
                }
                if (!this.mUfs.exists(uri2)) {
                    throw new IOException(String.format("Failed to rename journal log from %s to %s", uri, uri2));
                }
                if (this.mUfs.exists(uri)) {
                    LOG.info("Deleting current log {}", uri);
                    if (this.mUfs.deleteFile(uri)) {
                        return;
                    }
                    LOG.warn("Failed to delete current log file {}", uri);
                }
            } catch (JournalClosedException e) {
                LOG.warn("Skipping completeLog() since journal is not writable. error: {}", e.toString());
            }
        } catch (JournalClosedException e2) {
            LOG.warn("Skipping completeLog() since journal is not writable. error: {}", e2.toString());
        }
    }

    @Override // alluxio.master.journal.JournalWriter
    public synchronized void flush() throws IOException, JournalClosedException {
        checkIsWritable();
        maybeRecoverFromUfsFailures();
        if (this.mJournalOutputStream == null || this.mJournalOutputStream.bytesWritten() == 0) {
            return;
        }
        try {
            this.mJournalOutputStream.flush();
            this.mEntriesToFlush.clear();
            boolean z = this.mJournalOutputStream.bytesWritten() >= this.mMaxLogSize;
            if (z || !this.mUfs.supportsFlush()) {
                if (z) {
                    LOG.info("Rotating log file {}. size: {} maxSize: {}", new Object[]{currentLogName(), Long.valueOf(this.mJournalOutputStream.bytesWritten()), Long.valueOf(this.mMaxLogSize)});
                }
                this.mRotateLogForNextWrite = true;
            }
        } catch (JournalClosedException.IOJournalClosedException e) {
            throw e.toJournalClosedException();
        } catch (IOException e2) {
            this.mNeedsRecovery = true;
            UfsJournalFile currentLog = this.mJournalOutputStream.currentLog();
            this.mJournalOutputStream = null;
            throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{currentLog, e2.getMessage()}), e2);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.mClosed) {
            return;
        }
        this.mGarbageCollector.close();
        if (this.mJournalOutputStream != null) {
            try {
                this.mJournalOutputStream.close();
            } catch (Throwable th) {
                LOG.warn(String.format("Failed to close underlying UFS journal stream for: %s", this.mJournal), th);
            }
        }
        this.mClosed = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DataOutputStream wrapDataOutputStream(OutputStream outputStream) {
        return outputStream instanceof DataOutputStream ? (DataOutputStream) outputStream : new DataOutputStream(outputStream);
    }

    public synchronized long getNextSequenceNumber() {
        return this.mNextSequenceNumber;
    }

    @VisibleForTesting
    synchronized JournalOutputStream getJournalOutputStream() {
        return this.mJournalOutputStream;
    }

    private void checkIsWritable() throws JournalClosedException {
        if (!this.mJournal.isWritable()) {
            throw new JournalClosedException(String.format("writer not allowed to write (no longer primary). location: %s currentLog: %s", this.mJournal.getLocation(), currentLogName()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String currentLogName() {
        return this.mJournalOutputStream != null ? this.mJournalOutputStream.currentLog().toString() : "(null output stream)";
    }
}
