package alluxio.master;

import alluxio.ProcessUtils;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalEntryAssociation;
import alluxio.master.journal.JournalEntryStreamReader;
import alluxio.master.journal.JournalUtils;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.resource.CloseableIterator;
import alluxio.util.ThreadFactoryUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/BackupManager.class */
public class BackupManager {
    private static final long TERMINATION_SEQ = -1;
    public static final String BACKUP_FILE_FORMAT = "alluxio-backup-%s-%s.gz";
    private final MasterRegistry mRegistry;
    private long mBackupEntriesCount = TERMINATION_SEQ;
    private long mRestoreEntriesCount = TERMINATION_SEQ;
    private long mBackupTimeMs = TERMINATION_SEQ;
    private long mRestoreTimeMs = TERMINATION_SEQ;
    private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
    public static final Pattern BACKUP_FILE_PATTERN = Pattern.compile("alluxio-backup-[0-9]+-[0-9]+-[0-9]+-([0-9]+).gz");

    public BackupManager(MasterRegistry masterRegistry) {
        this.mRegistry = masterRegistry;
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_LAST_BACKUP_ENTRIES_COUNT.getName(), () -> {
            return Long.valueOf(this.mBackupEntriesCount);
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_LAST_BACKUP_RESTORE_COUNT.getName(), () -> {
            return Long.valueOf(this.mRestoreEntriesCount);
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_LAST_BACKUP_RESTORE_TIME_MS.getName(), () -> {
            return Long.valueOf(this.mRestoreTimeMs);
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_LAST_BACKUP_TIME_MS.getName(), () -> {
            return Long.valueOf(this.mBackupTimeMs);
        });
    }

    public void backup(OutputStream outputStream, AtomicLong atomicLong) throws IOException {
        GzipCompressorOutputStream gzipCompressorOutputStream = new GzipCompressorOutputStream(outputStream);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("master-backup-%d", true)));
        HashSet hashSet = new HashSet();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(ServerConfiguration.getInt(PropertyKey.MASTER_BACKUP_ENTRY_BUFFER_COUNT));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        long currentTimeMillis = System.currentTimeMillis();
        hashSet.add(executorCompletionService.submit(() -> {
            try {
                try {
                    Iterator<Master> it = this.mRegistry.getServers().iterator();
                    while (it.hasNext()) {
                        CloseableIterator<Journal.JournalEntry> journalEntryIterator = it.next().getJournalEntryIterator();
                        do {
                            try {
                                if (((Iterator) journalEntryIterator.get()).hasNext()) {
                                    linkedBlockingQueue.put((Journal.JournalEntry) ((Iterator) journalEntryIterator.get()).next());
                                } else if (journalEntryIterator != null) {
                                    journalEntryIterator.close();
                                }
                            } catch (Throwable th) {
                                if (journalEntryIterator != null) {
                                    try {
                                        journalEntryIterator.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } while (!Thread.interrupted());
                        throw new InterruptedException();
                    }
                    linkedBlockingQueue.put(Journal.JournalEntry.newBuilder().setSequenceNumber(TERMINATION_SEQ).build());
                    atomicBoolean.set(false);
                    return true;
                } catch (InterruptedException e) {
                    LOG.info("Backup reader task interrupted");
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Thread interrupted while reading master state.", e);
                }
            } catch (Throwable th3) {
                atomicBoolean.set(false);
                throw th3;
            }
        }));
        hashSet.add(executorCompletionService.submit(() -> {
            try {
                LinkedList<Journal.JournalEntry> linkedList = new LinkedList();
                while (true) {
                    if (!atomicBoolean.get() && linkedBlockingQueue.size() <= 0) {
                        return true;
                    }
                    if (0 == linkedBlockingQueue.drainTo(linkedList)) {
                        linkedList.add((Journal.JournalEntry) linkedBlockingQueue.take());
                    }
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    for (Journal.JournalEntry journalEntry : linkedList) {
                        if (journalEntry.getSequenceNumber() == TERMINATION_SEQ) {
                            return true;
                        }
                        journalEntry.writeDelimitedTo(gzipCompressorOutputStream);
                        atomicLong.incrementAndGet();
                    }
                    linkedList.clear();
                }
            } catch (InterruptedException e) {
                LOG.info("Backup writer task interrupted");
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while writing to backup stream.", e);
            }
        }));
        safeWaitTasks(hashSet, executorCompletionService);
        this.mBackupTimeMs = System.currentTimeMillis() - currentTimeMillis;
        this.mBackupEntriesCount = atomicLong.get();
        gzipCompressorOutputStream.finish();
        LOG.info("Created backup with {} entries", Long.valueOf(atomicLong.get()));
    }

    public void initFromBackup(InputStream inputStream) throws IOException {
        GzipCompressorInputStream gzipCompressorInputStream = new GzipCompressorInputStream(inputStream);
        try {
            JournalEntryStreamReader journalEntryStreamReader = new JournalEntryStreamReader(gzipCompressorInputStream);
            try {
                List<Master> servers = this.mRegistry.getServers();
                ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("master-backup-%d", true)));
                HashSet hashSet = new HashSet();
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(ServerConfiguration.getInt(PropertyKey.MASTER_BACKUP_ENTRY_BUFFER_COUNT));
                AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                ImmutableMap uniqueIndex = Maps.uniqueIndex(servers, (v0) -> {
                    return v0.getName();
                });
                AtomicLong atomicLong = new AtomicLong(0L);
                ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, ThreadFactoryUtils.build("master-backup-tracer-%d", true));
                newScheduledThreadPool.scheduleAtFixedRate(() -> {
                    LOG.info("{} entries from backup applied so far...", Long.valueOf(atomicLong.get()));
                }, 30L, 30L, TimeUnit.SECONDS);
                long currentTimeMillis = System.currentTimeMillis();
                hashSet.add(executorCompletionService.submit(() -> {
                    while (true) {
                        try {
                            try {
                                Journal.JournalEntry readEntry = journalEntryStreamReader.readEntry();
                                if (readEntry == null) {
                                    linkedBlockingQueue.put(Journal.JournalEntry.newBuilder().setSequenceNumber(TERMINATION_SEQ).build());
                                    atomicBoolean.set(false);
                                    return true;
                                }
                                linkedBlockingQueue.put(readEntry);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new RuntimeException("Thread interrupted while reading from backup stream.", e);
                            }
                        } catch (Throwable th) {
                            atomicBoolean.set(false);
                            throw th;
                        }
                    }
                }));
                hashSet.add(executorCompletionService.submit(() -> {
                    HashMap hashMap;
                    while (true) {
                        try {
                            if (!atomicBoolean.get() && linkedBlockingQueue.size() <= 0) {
                                return true;
                            }
                            LinkedList<Journal.JournalEntry> linkedList = new LinkedList();
                            try {
                                if (0 == linkedBlockingQueue.drainTo(linkedList)) {
                                    Journal.JournalEntry journalEntry = (Journal.JournalEntry) linkedBlockingQueue.poll(10L, TimeUnit.MILLISECONDS);
                                    if (journalEntry != null) {
                                        linkedList.add(journalEntry);
                                    }
                                }
                                Iterator it = servers.iterator();
                                while (it.hasNext()) {
                                    Master master = (Master) it.next();
                                    hashMap.put(master, master.createJournalContext());
                                }
                                for (Journal.JournalEntry journalEntry2 : linkedList) {
                                    if (journalEntry2.getSequenceNumber() == TERMINATION_SEQ) {
                                        Iterator it2 = hashMap.values().iterator();
                                        while (it2.hasNext()) {
                                            ((JournalContext) it2.next()).close();
                                        }
                                        return true;
                                    }
                                    try {
                                        String masterForEntry = JournalEntryAssociation.getMasterForEntry(journalEntry2);
                                        try {
                                            Master master2 = (Master) uniqueIndex.get(masterForEntry);
                                            master2.applyAndJournal((Supplier) hashMap.get(master2), journalEntry2);
                                            atomicLong.incrementAndGet();
                                        } catch (Exception e) {
                                            JournalUtils.handleJournalReplayFailure(LOG, e, "Failed to apply journal entry to master %s. Entry: %s", masterForEntry, journalEntry2);
                                        }
                                    } catch (IllegalStateException e2) {
                                        ProcessUtils.fatalError(LOG, e2, "Unrecognized journal entry: %s", journalEntry2);
                                        throw e2;
                                    }
                                }
                            } finally {
                                Iterator it3 = hashMap.values().iterator();
                                while (it3.hasNext()) {
                                    ((JournalContext) it3.next()).close();
                                }
                            }
                            hashMap = new HashMap();
                        } catch (InterruptedException e3) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException("Thread interrupted while applying backup content.", e3);
                        }
                    }
                }));
                try {
                    safeWaitTasks(hashSet, executorCompletionService);
                    this.mRestoreTimeMs = System.currentTimeMillis() - currentTimeMillis;
                    this.mRestoreEntriesCount = atomicLong.get();
                    newScheduledThreadPool.shutdownNow();
                    LOG.info("Restored {} entries from backup", Long.valueOf(atomicLong.get()));
                    journalEntryStreamReader.close();
                    gzipCompressorInputStream.close();
                } catch (Throwable th) {
                    this.mRestoreTimeMs = System.currentTimeMillis() - currentTimeMillis;
                    this.mRestoreEntriesCount = atomicLong.get();
                    newScheduledThreadPool.shutdownNow();
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            try {
                gzipCompressorInputStream.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    private void safeWaitTasks(Set<Future<?>> set, CompletionService<?> completionService) throws IOException {
        while (set.size() > 0) {
            try {
                Future<?> take = completionService.take();
                set.remove(take);
                take.get();
            } catch (InterruptedException e) {
                set.forEach(future -> {
                    future.cancel(true);
                });
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while waiting for backup threads.", e);
            } catch (ExecutionException e2) {
                set.forEach(future2 -> {
                    future2.cancel(true);
                });
                Throwable cause = e2.getCause();
                if (!(cause instanceof IOException)) {
                    throw new IOException(cause);
                }
                throw ((IOException) cause);
            }
        }
    }
}
