package alluxio.master.journal;

import alluxio.collections.ConcurrentHashSet;
import alluxio.concurrent.ForkJoinPoolHelper;
import alluxio.concurrent.jsr.ForkJoinPool;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.JournalClosedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.master.journal.sink.JournalSink;
import alluxio.proto.journal.Journal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter.class */
public final class AsyncJournalWriter {
    private final JournalWriter mJournalWriter;
    private final Supplier<Set<JournalSink>> mJournalSinks;
    private final Set<FlushTicket> mTicketSet = new ConcurrentHashSet();
    private Thread mFlushThread = new Thread(this::doFlush, "AsyncJournalWriterThread");
    private final Semaphore mFlushSemaphore = new Semaphore(0, true);
    private volatile boolean mStopFlushing = false;
    private final ConcurrentLinkedQueue<Journal.JournalEntry> mQueue = new ConcurrentLinkedQueue<>();
    private final AtomicLong mCounter = new AtomicLong(0);
    private final AtomicLong mFlushCounter = new AtomicLong(0);
    private Long mWriteCounter = 0L;
    private final long mFlushBatchTimeNs = TimeUnit.NANOSECONDS.convert(ServerConfiguration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_BATCH_TIME_MS), TimeUnit.MILLISECONDS);

    /* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter$FlushTicket.class */
    private class FlushTicket implements ForkJoinPool.ManagedBlocker {
        private final long mTargetCounter;
        private SettableFuture<Void> mIsCompleted = SettableFuture.create();
        private Throwable mError = null;

        public FlushTicket(long j) {
            this.mTargetCounter = j;
        }

        public long getTargetCounter() {
            return this.mTargetCounter;
        }

        public void setCompleted() {
            this.mIsCompleted.set((Object) null);
        }

        public void setError(Throwable th) {
            this.mIsCompleted.setException(th);
            this.mError = th;
        }

        public void waitCompleted() throws Throwable {
            ForkJoinPoolHelper.safeManagedBlock(this);
            if (this.mError != null) {
                throw this.mError;
            }
        }

        public boolean block() throws InterruptedException {
            try {
                this.mIsCompleted.get();
                return true;
            } catch (ExecutionException e) {
                this.mError = e.getCause();
                return true;
            }
        }

        public boolean isReleasable() {
            return this.mIsCompleted.isDone() || this.mIsCompleted.isCancelled();
        }
    }

    public AsyncJournalWriter(JournalWriter journalWriter, Supplier<Set<JournalSink>> supplier) {
        this.mJournalWriter = (JournalWriter) Preconditions.checkNotNull(journalWriter, "journalWriter");
        this.mJournalSinks = supplier;
        this.mFlushThread.start();
    }

    public long appendEntry(Journal.JournalEntry journalEntry) {
        this.mCounter.incrementAndGet();
        this.mQueue.offer(journalEntry);
        return this.mCounter.get();
    }

    public void close() {
        stop();
    }

    @VisibleForTesting
    protected void stop() {
        this.mStopFlushing = true;
        this.mFlushSemaphore.release();
        try {
            this.mFlushThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.mFlushThread = null;
            this.mFlushSemaphore.tryAcquire();
        }
    }

    @VisibleForTesting
    protected void start() {
        if (this.mFlushThread != null) {
            close();
        }
        this.mFlushThread = new Thread(this::doFlush);
        this.mStopFlushing = false;
        this.mFlushThread.start();
    }

    private void doFlush() {
        Journal.JournalEntry peek;
        while (!this.mStopFlushing) {
            while (this.mQueue.isEmpty() && !this.mStopFlushing) {
                try {
                    if (this.mFlushSemaphore.tryAcquire(this.mFlushBatchTimeNs, TimeUnit.NANOSECONDS)) {
                        break;
                    }
                } catch (InterruptedException e) {
                }
            }
            try {
                long nanoTime = System.nanoTime();
                while (!this.mQueue.isEmpty() && (peek = this.mQueue.peek()) != null) {
                    this.mJournalWriter.write(peek);
                    JournalUtils.sinkAppend(this.mJournalSinks, peek);
                    this.mQueue.poll();
                    Long l = this.mWriteCounter;
                    this.mWriteCounter = Long.valueOf(this.mWriteCounter.longValue() + 1);
                    if (System.nanoTime() - nanoTime >= this.mFlushBatchTimeNs && !this.mStopFlushing) {
                        break;
                    }
                }
                if (this.mFlushCounter.get() < this.mWriteCounter.longValue()) {
                    this.mJournalWriter.flush();
                    JournalUtils.sinkFlush(this.mJournalSinks);
                    this.mFlushCounter.set(this.mWriteCounter.longValue());
                }
                Iterator<FlushTicket> it = this.mTicketSet.iterator();
                while (it.hasNext()) {
                    FlushTicket next = it.next();
                    if (next.getTargetCounter() <= this.mFlushCounter.get()) {
                        next.setCompleted();
                        it.remove();
                    }
                }
            } catch (IOException | JournalClosedException e2) {
                Iterator<FlushTicket> it2 = this.mTicketSet.iterator();
                while (it2.hasNext()) {
                    FlushTicket next2 = it2.next();
                    it2.remove();
                    if (next2.getTargetCounter() <= this.mFlushCounter.get()) {
                        next2.setCompleted();
                    } else {
                        next2.setError(e2);
                    }
                }
            }
        }
    }

    public void flush(long j) throws IOException, JournalClosedException {
        if (j <= this.mFlushCounter.get()) {
            return;
        }
        FlushTicket flushTicket = new FlushTicket(j);
        this.mTicketSet.add(flushTicket);
        try {
            try {
                this.mFlushSemaphore.release();
                flushTicket.waitCompleted();
                this.mFlushSemaphore.tryAcquire();
            } catch (InterruptedException e) {
                throw new AlluxioStatusException(Status.CANCELLED.withCause(e));
            } catch (Throwable th) {
                if (th instanceof IOException) {
                    throw ((IOException) th);
                }
                if (!(th instanceof JournalClosedException)) {
                    throw new AlluxioStatusException(Status.INTERNAL.withCause(th));
                }
                throw th;
            }
        } catch (Throwable th2) {
            this.mFlushSemaphore.tryAcquire();
            throw th2;
        }
    }
}
