package io.datarouter.storage.vacuum;

import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.util.VacuumMetrics;
import io.datarouter.util.number.NumberFormatter;
import io.datarouter.util.retry.RetryableTool;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/storage/vacuum/BaseNodeVacuum.class */
public abstract class BaseNodeVacuum<PK extends PrimaryKey<PK>, T> {
    private static final Logger logger = LoggerFactory.getLogger(BaseNodeVacuum.class);
    private final Scanner<T> scanner;
    private final int deleteBatchSize;
    private final Consumer<Collection<PK>> deleteConsumer;
    private final String nameForMetrics;
    private final boolean shouldUpdateTaskTracker;
    private final Predicate<T> shouldDelete;
    private final Threads threads;

    /* loaded from: input_file:io/datarouter/storage/vacuum/BaseNodeVacuum$BaseNodeVacuumBuilder.class */
    public static abstract class BaseNodeVacuumBuilder<PK extends PrimaryKey<PK>, T, C extends BaseNodeVacuumBuilder<PK, T, C>> {
        protected final String nameForMetrics;
        protected final Scanner<T> scanner;
        protected final Predicate<T> shouldDelete;
        protected final Consumer<Collection<PK>> deleteConsumer;
        protected boolean shouldUpdateTaskTracker = true;
        protected int deleteBatchSize = 100;
        protected Threads threads = Threads.none();

        public BaseNodeVacuumBuilder(String str, Scanner<T> scanner, Predicate<T> predicate, Consumer<Collection<PK>> consumer) {
            this.nameForMetrics = str;
            this.scanner = scanner;
            this.shouldDelete = predicate;
            this.deleteConsumer = consumer;
        }

        protected abstract C self();

        public C withDeleteBatchSize(int i) {
            this.deleteBatchSize = i;
            return self();
        }

        public C withThreads(Threads threads) {
            this.threads = threads;
            return self();
        }

        public C disableTaskTrackerUpdates() {
            this.shouldUpdateTaskTracker = false;
            return self();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseNodeVacuum(Scanner<T> scanner, Consumer<Collection<PK>> consumer, int i, String str, boolean z, Predicate<T> predicate, Threads threads) {
        this.scanner = scanner;
        this.deleteBatchSize = i;
        this.deleteConsumer = consumer;
        this.nameForMetrics = str;
        this.shouldUpdateTaskTracker = z;
        this.shouldDelete = predicate;
        this.threads = threads;
    }

    protected abstract PK getKey(T t);

    public void run(TaskTracker taskTracker) {
        AtomicLong atomicLong = new AtomicLong();
        Scanner map = this.scanner.batch(100).advanceUntil(list -> {
            return taskTracker.shouldStop();
        }).each(list2 -> {
            VacuumMetrics.considered(this.nameForMetrics, list2.size());
            if (this.shouldUpdateTaskTracker) {
                taskTracker.increment(list2.size());
                taskTracker.setLastItemProcessed(list2.getLast().toString());
            }
        }).concat((v0) -> {
            return Scanner.of(v0);
        }).include(this.shouldDelete).map(this::getKey).batch(this.deleteBatchSize).parallelUnordered(this.threads).each(this::deleteWithRetries).each(list3 -> {
            VacuumMetrics.deleted(this.nameForMetrics, list3.size());
        }).map((v0) -> {
            return v0.size();
        });
        atomicLong.getClass();
        map.forEach((v1) -> {
            r1.addAndGet(v1);
        });
        logProgress(atomicLong.get(), taskTracker.getCount(), taskTracker.getLastItem());
    }

    private void deleteWithRetries(List<PK> list) {
        RetryableTool.tryNTimesWithBackoffUnchecked(() -> {
            this.deleteConsumer.accept(list);
        }, 3, Duration.ofSeconds(1L), true);
    }

    private void logProgress(long j, long j2, String str) {
        logger.info("deleted {}/{} through {}", new Object[]{NumberFormatter.addCommas(Long.valueOf(j)), NumberFormatter.addCommas(Long.valueOf(j2)), str});
    }
}
