package org.apache.bookkeeper.replication;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.replication.AuditorTask;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.6.2.jar:org/apache/bookkeeper/replication/AuditorBookieCheckTask.class */
public class AuditorBookieCheckTask extends AuditorTask {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AuditorBookieCheckTask.class);
    private final BookieLedgerIndexer bookieLedgerIndexer;
    private final BiConsumer<Void, Throwable> submitCheckTask;

    public AuditorBookieCheckTask(ServerConfiguration serverConfiguration, AuditorStats auditorStats, BookKeeperAdmin bookKeeperAdmin, LedgerManager ledgerManager, LedgerUnderreplicationManager ledgerUnderreplicationManager, AuditorTask.ShutdownTaskHandler shutdownTaskHandler, BookieLedgerIndexer bookieLedgerIndexer, BiConsumer<AtomicBoolean, Throwable> biConsumer, BiConsumer<Void, Throwable> biConsumer2) {
        super(serverConfiguration, auditorStats, bookKeeperAdmin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler, biConsumer);
        this.bookieLedgerIndexer = bookieLedgerIndexer;
        this.submitCheckTask = biConsumer2;
    }

    @Override // org.apache.bookkeeper.replication.AuditorTask
    protected void runTask() {
        if (hasBookieCheckTask()) {
            LOG.info("Audit already scheduled; skipping periodic bookie check");
        } else {
            startAudit(true);
        }
    }

    @Override // org.apache.bookkeeper.replication.AuditorTask
    public void shutdown() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startAudit(boolean z) {
        try {
            auditBookies();
            z = false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while watching available bookies ", (Throwable) e);
        } catch (BKException e2) {
            LOG.error("Exception getting bookie list", (Throwable) e2);
        } catch (ReplicationException.BKAuditException e3) {
            LOG.error("Exception while watching available bookies", (Throwable) e3);
        }
        if (z) {
            submitShutdownTask();
        }
    }

    void auditBookies() throws ReplicationException.BKAuditException, InterruptedException, BKException {
        try {
            waitIfLedgerReplicationDisabled();
            LOG.info("Starting auditBookies");
            Stopwatch createStarted = Stopwatch.createStarted();
            Map<String, Set<Long>> generateBookie2LedgersIndex = generateBookie2LedgersIndex();
            try {
                if (!isLedgerReplicationEnabled()) {
                    this.submitCheckTask.accept(null, null);
                    return;
                }
                Collection<String> subtract = CollectionUtils.subtract(generateBookie2LedgersIndex.keySet(), getAvailableBookies());
                this.auditorStats.getBookieToLedgersMapCreationTime().registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                if (subtract.size() > 0) {
                    try {
                        FutureUtils.result(handleLostBookiesAsync(subtract, generateBookie2LedgersIndex), ReplicationException.EXCEPTION_HANDLER);
                        this.auditorStats.getURLPublishTimeForLostBookies().registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                    } catch (ReplicationException e) {
                        throw new ReplicationException.BKAuditException(e.getMessage(), e.getCause());
                    }
                }
                LOG.info("Completed auditBookies");
                this.auditorStats.getAuditBookiesTime().registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            } catch (ReplicationException.UnavailableException e2) {
                LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            }
        } catch (ReplicationException.NonRecoverableReplicationException e3) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable) e3);
            submitShutdownTask();
        } catch (ReplicationException.UnavailableException e4) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
        }
    }

    private Map<String, Set<Long>> generateBookie2LedgersIndex() throws ReplicationException.BKAuditException {
        return this.bookieLedgerIndexer.getBookieToLedgerIndex();
    }

    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> collection, Map<String, Set<Long>> map) {
        LOG.info("Following are the failed bookies: {}, and searching its ledgers for re-replication", collection);
        return FutureUtils.processList(Lists.newArrayList(collection), str -> {
            return publishSuspectedLedgersAsync(Lists.newArrayList(str), (Set) map.get(str));
        }, null);
    }

    protected void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        if (isLedgerReplicationEnabled()) {
            return;
        }
        LOG.info("LedgerReplication is disabled externally through Zookeeper, since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
        ReplicationEnableCb replicationEnableCb = new ReplicationEnableCb();
        this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(replicationEnableCb);
        replicationEnableCb.await();
    }

    @Override // org.apache.bookkeeper.replication.AuditorTask, java.lang.Runnable
    public /* bridge */ /* synthetic */ void run() {
        super.run();
    }
}
