package alluxio.master.journal;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.JournalClosedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.Status;
import alluxio.proto.journal.Journal;
import alluxio.resource.LockResource;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter.class */
public final class AsyncJournalWriter {
    private final JournalWriter mJournalWriter;

    @GuardedBy("mTicketLock")
    private final List<FlushTicket> mTicketList = new LinkedList();
    private final ReentrantLock mTicketLock = new ReentrantLock(true);
    private Thread mFlushThread = new Thread(this::doFlush, "AsyncJournalWriterThread");
    private final Semaphore mFlushSemaphore = new Semaphore(0, true);
    private volatile boolean mStopFlushing = false;
    private final ConcurrentLinkedQueue<Journal.JournalEntry> mQueue = new ConcurrentLinkedQueue<>();
    private final AtomicLong mCounter = new AtomicLong(0);
    private final AtomicLong mFlushCounter = new AtomicLong(0);
    private Long mWriteCounter = 0L;
    private final long mFlushBatchTimeNs = TimeUnit.NANOSECONDS.convert(ServerConfiguration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_BATCH_TIME_MS), TimeUnit.MILLISECONDS);

    /* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter$FlushTicket.class */
    private class FlushTicket {
        private final long mTargetCounter;
        private SettableFuture<Void> mIsCompleted = SettableFuture.create();

        public FlushTicket(long j) {
            this.mTargetCounter = j;
        }

        public long getTargetCounter() {
            return this.mTargetCounter;
        }

        public void setCompleted() {
            this.mIsCompleted.set((Object) null);
        }

        public void setError(Throwable th) {
            this.mIsCompleted.setException(th);
        }

        public void waitCompleted() throws InterruptedException, ExecutionException {
            this.mIsCompleted.get();
        }
    }

    public AsyncJournalWriter(JournalWriter journalWriter) {
        this.mJournalWriter = (JournalWriter) Preconditions.checkNotNull(journalWriter, "journalWriter");
        this.mFlushThread.start();
    }

    public long appendEntry(Journal.JournalEntry journalEntry) {
        this.mCounter.incrementAndGet();
        this.mQueue.offer(journalEntry);
        return this.mCounter.get();
    }

    public void close() {
        stop();
    }

    @VisibleForTesting
    protected void stop() {
        this.mStopFlushing = true;
        this.mFlushSemaphore.release();
        try {
            this.mFlushThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.mFlushThread = null;
            this.mFlushSemaphore.tryAcquire();
        }
    }

    @VisibleForTesting
    protected void start() {
        if (this.mFlushThread != null) {
            close();
        }
        this.mFlushThread = new Thread(this::doFlush);
        this.mStopFlushing = false;
        this.mFlushThread.start();
    }

    private void doFlush() {
        LockResource lockResource;
        Throwable th;
        Journal.JournalEntry peek;
        while (!this.mStopFlushing) {
            while (this.mQueue.isEmpty() && !this.mStopFlushing) {
                try {
                    if (this.mFlushSemaphore.tryAcquire(this.mFlushBatchTimeNs, TimeUnit.NANOSECONDS)) {
                        break;
                    }
                } catch (InterruptedException e) {
                }
            }
            try {
                long nanoTime = System.nanoTime();
                while (!this.mQueue.isEmpty() && (peek = this.mQueue.peek()) != null) {
                    this.mJournalWriter.write(peek);
                    this.mQueue.poll();
                    Long l = this.mWriteCounter;
                    this.mWriteCounter = Long.valueOf(this.mWriteCounter.longValue() + 1);
                    if (System.nanoTime() - nanoTime >= this.mFlushBatchTimeNs && !this.mStopFlushing) {
                        break;
                    }
                }
                if (this.mFlushCounter.get() < this.mWriteCounter.longValue()) {
                    this.mJournalWriter.flush();
                    this.mFlushCounter.set(this.mWriteCounter.longValue());
                }
                lockResource = new LockResource(this.mTicketLock);
                th = null;
            } catch (IOException | JournalClosedException e2) {
                LockResource lockResource2 = new LockResource(this.mTicketLock);
                Throwable th2 = null;
                try {
                    try {
                        ListIterator<FlushTicket> listIterator = this.mTicketList.listIterator();
                        while (listIterator.hasNext()) {
                            FlushTicket next = listIterator.next();
                            listIterator.remove();
                            if (next.getTargetCounter() <= this.mFlushCounter.get()) {
                                next.setCompleted();
                            } else {
                                next.setError(e2);
                            }
                        }
                        if (lockResource2 != null) {
                            if (0 != 0) {
                                try {
                                    lockResource2.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                lockResource2.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th4) {
                    if (lockResource2 != null) {
                        if (th2 != null) {
                            try {
                                lockResource2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            lockResource2.close();
                        }
                    }
                    throw th4;
                }
            }
            try {
                try {
                    ListIterator<FlushTicket> listIterator2 = this.mTicketList.listIterator();
                    while (listIterator2.hasNext()) {
                        FlushTicket next2 = listIterator2.next();
                        if (next2.getTargetCounter() <= this.mFlushCounter.get()) {
                            next2.setCompleted();
                            listIterator2.remove();
                        }
                    }
                    if (lockResource != null) {
                        if (0 != 0) {
                            try {
                                lockResource.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            lockResource.close();
                        }
                    }
                } catch (Throwable th7) {
                    throw th7;
                    break;
                }
            } finally {
            }
        }
    }

    public void flush(long j) throws IOException, JournalClosedException {
        if (j <= this.mFlushCounter.get()) {
            return;
        }
        FlushTicket flushTicket = new FlushTicket(j);
        LockResource lockResource = new LockResource(this.mTicketLock);
        Throwable th = null;
        try {
            this.mTicketList.add(flushTicket);
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    lockResource.close();
                }
            }
            try {
                try {
                    this.mFlushSemaphore.release();
                    flushTicket.waitCompleted();
                    this.mFlushSemaphore.tryAcquire();
                } catch (Throwable th3) {
                    this.mFlushSemaphore.tryAcquire();
                    throw th3;
                }
            } catch (InterruptedException e) {
                throw new AlluxioStatusException(Status.CANCELED, e);
            } catch (ExecutionException e2) {
                JournalClosedException cause = e2.getCause();
                if (cause != null && (cause instanceof IOException)) {
                    throw ((IOException) cause);
                }
                if (cause != null && (cause instanceof JournalClosedException)) {
                    throw cause;
                }
                throw new AlluxioStatusException(Status.INTERNAL, e2);
            }
        } catch (Throwable th4) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th4;
        }
    }
}
