package alluxio.master.journal.ufs;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.exception.InvalidJournalEntryException;
import alluxio.master.journal.JournalEntryStateMachine;
import alluxio.master.journal.JournalReader;
import alluxio.proto.journal.Journal;
import alluxio.util.CommonUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/master/journal/ufs/UfsJournalCheckpointThread.class */
public final class UfsJournalCheckpointThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalCheckpointThread.class);
    private final JournalEntryStateMachine mMaster;
    private final UfsJournal mJournal;
    private final long mShutdownQuietWaitTimeMs;
    private JournalReader mJournalReader;
    private long mNextSequenceNumberToCheckpoint;
    private volatile boolean mShutdownInitiated = false;
    private volatile boolean mStopped = false;
    private volatile boolean mWaitQuietPeriod = true;
    private final int mJournalCheckpointSleepTimeMs = (int) Configuration.getMs(PropertyKey.MASTER_JOURNAL_TAILER_SLEEP_TIME_MS);
    private final long mCheckpointPeriodEntries = Configuration.getLong(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES);

    public UfsJournalCheckpointThread(JournalEntryStateMachine journalEntryStateMachine, UfsJournal ufsJournal) {
        this.mMaster = (JournalEntryStateMachine) Preconditions.checkNotNull(journalEntryStateMachine);
        this.mJournal = (UfsJournal) Preconditions.checkNotNull(ufsJournal);
        this.mShutdownQuietWaitTimeMs = ufsJournal.getQuietPeriodMs();
        this.mJournalReader = new UfsJournalReader(this.mJournal, 0L, false);
    }

    public void awaitTermination(boolean z) {
        LOG.info("{}: Journal checkpointer shutdown has been initiated.", this.mMaster.getName());
        this.mWaitQuietPeriod = z;
        this.mShutdownInitiated = true;
        try {
            join();
            LOG.info("{}: Journal shutdown complete", this.mMaster.getName());
        } catch (InterruptedException e) {
            LOG.error("{}: journal checkpointer shutdown is interrupted.", this.mMaster.getName(), e);
            throw new RuntimeException(e);
        }
    }

    public long getNextSequenceNumber() {
        Preconditions.checkState(this.mStopped);
        return this.mJournalReader.getNextSequenceNumber();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            runInternal();
        } catch (RuntimeException e) {
            LOG.error("{}: Failed to run journal checkpoint thread, crashing.", this.mMaster.getName(), e);
            throw e;
        }
    }

    private void runInternal() {
        LOG.info("{}: Journal checkpoint thread started.", this.mMaster.getName());
        boolean z = false;
        while (true) {
            try {
                Journal.JournalEntry read = this.mJournalReader.read();
                if (read != null) {
                    this.mMaster.processJournalEntry(read);
                    if (z) {
                        LOG.info("Quiet period interrupted by new journal entry");
                        z = false;
                    }
                }
                if (read == null) {
                    maybeCheckpoint();
                    if (!this.mShutdownInitiated) {
                        CommonUtils.sleepMs(LOG, this.mJournalCheckpointSleepTimeMs);
                    } else {
                        if (z || !this.mWaitQuietPeriod) {
                            break;
                        }
                        CommonUtils.sleepMs(LOG, this.mShutdownQuietWaitTimeMs);
                        z = true;
                    }
                } else {
                    continue;
                }
            } catch (IOException | InvalidJournalEntryException e) {
                LOG.warn("{}: Failed to read or process the journal entry with error {}.", this.mMaster.getName(), e.getMessage());
                try {
                    this.mJournalReader.close();
                } catch (IOException e2) {
                    LOG.warn("{}: Failed to close the journal reader with error {}.", this.mMaster.getName(), e2.getMessage());
                }
                this.mJournalReader = new UfsJournalReader(this.mJournal, this.mJournalReader.getNextSequenceNumber(), false);
                z = false;
            }
        }
        LOG.info("{}: Journal checkpoint thread has been shutdown. No new logs have been found during the quiet period.", this.mMaster.getName());
        this.mStopped = true;
        if (this.mJournalReader != null) {
            try {
                this.mJournalReader.close();
            } catch (IOException e3) {
                LOG.warn("{}: Failed to close the journal reader with error {}.", this.mMaster.getName(), e3.getMessage());
            }
        }
    }

    private void maybeCheckpoint() {
        if (this.mShutdownInitiated) {
            return;
        }
        long nextSequenceNumber = this.mJournalReader.getNextSequenceNumber();
        if (nextSequenceNumber - this.mNextSequenceNumberToCheckpoint < this.mCheckpointPeriodEntries) {
            return;
        }
        try {
            this.mNextSequenceNumberToCheckpoint = this.mJournal.getNextSequenceNumberToCheckpoint();
            if (nextSequenceNumber - this.mNextSequenceNumberToCheckpoint < this.mCheckpointPeriodEntries) {
                return;
            }
            writeCheckpoint(nextSequenceNumber);
        } catch (IOException e) {
            LOG.warn("{}: Failed to get the next sequence number to checkpoint with error {}.", this.mMaster.getName(), e.getMessage());
        }
    }

    private void writeCheckpoint(long j) {
        LOG.info("{}: Writing checkpoint [sequence number {}].", this.mMaster.getName(), Long.valueOf(j));
        Iterator<Journal.JournalEntry> journalEntryIterator = this.mMaster.getJournalEntryIterator();
        UfsJournalCheckpointWriter ufsJournalCheckpointWriter = null;
        IOException iOException = null;
        try {
            ufsJournalCheckpointWriter = this.mJournal.getCheckpointWriter(j);
            while (journalEntryIterator.hasNext() && !this.mShutdownInitiated) {
                ufsJournalCheckpointWriter.write(journalEntryIterator.next());
            }
        } catch (IOException e) {
            LOG.warn("{}: Failed to checkpoint with error {}.", this.mMaster.getName(), e.getMessage());
            iOException = e;
        }
        if (ufsJournalCheckpointWriter != null) {
            try {
                if (journalEntryIterator.hasNext() || this.mShutdownInitiated || iOException != null) {
                    ufsJournalCheckpointWriter.cancel();
                    LOG.info("{}: Cancelled checkpoint [sequence number {}].", this.mMaster.getName(), Long.valueOf(j));
                } else {
                    ufsJournalCheckpointWriter.close();
                    LOG.info("{}: Finished checkpoint [sequence number {}].", this.mMaster.getName(), Long.valueOf(j));
                    this.mNextSequenceNumberToCheckpoint = j;
                }
            } catch (IOException e2) {
                LOG.warn("{}: Failed to cancel or finish the checkpoint [sequence number {}] with error {}.", new Object[]{this.mMaster.getName(), Long.valueOf(j), e2.getMessage()});
            }
        }
    }
}
