package io.datarouter.nodewatch.shadowtable;

import io.datarouter.bytes.KvString;
import io.datarouter.nodewatch.shadowtable.config.DatarouterShadowTableExecutors;
import io.datarouter.nodewatch.shadowtable.service.ShadowTableNodeSelectionService;
import io.datarouter.nodewatch.shadowtable.service.ShadowTableRangeCombineService;
import io.datarouter.nodewatch.shadowtable.service.ShadowTableRangeExportService;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.client.ClientAndTableNames;
import io.datarouter.types.Ulid;
import io.datarouter.util.number.NumberFormatter;
import jakarta.inject.Inject;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/nodewatch/shadowtable/ShadowTableExporter.class */
public class ShadowTableExporter {
    private static final Logger logger = LoggerFactory.getLogger(ShadowTableExporter.class);

    @Inject
    private DatarouterShadowTableExecutors.ShadowTableExportReadExecutor exportReadExecutor;

    @Inject
    private ShadowTableNodeSelectionService nodeSelectionService;

    @Inject
    private ShadowTableRangeExportService rangeExportService;

    @Inject
    private ShadowTableRangeCombineService rangeCombineService;
    private final Map<ClientAndTableNames, AtomicLong> numRowsByTable = new ConcurrentHashMap();
    private final Map<ClientAndTableNames, AtomicLong> numRemainingRangesByTable = new ConcurrentHashMap();

    public void export(int i, ShadowTableExport shadowTableExport) {
        String newValue = Ulid.newValue();
        List list = Scanner.of(this.nodeSelectionService.listNodesForExport(shadowTableExport)).sort(Comparator.comparing((v0) -> {
            return v0.clientAndTableNames();
        }, ClientAndTableNames.COMPARE_CLIENT_TABLE)).list();
        logger.warn("starting {}", new KvString().add("jobId", Integer.valueOf(i), (v0) -> {
            return NumberFormatter.addCommas(v0);
        }).add("exportId", newValue).add("numNodes", Integer.valueOf(list.size()), (v0) -> {
            return NumberFormatter.addCommas(v0);
        }));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Scanner.of(list).concatIter(physicalSortedStorageReaderNode -> {
            List makeRangeExportRequests = this.rangeExportService.makeRangeExportRequests(shadowTableExport, newValue, physicalSortedStorageReaderNode);
            this.numRowsByTable.put(physicalSortedStorageReaderNode.clientAndTableNames(), new AtomicLong());
            this.numRemainingRangesByTable.put(physicalSortedStorageReaderNode.clientAndTableNames(), new AtomicLong(makeRangeExportRequests.size()));
            return makeRangeExportRequests;
        }).parallelUnordered(new Threads(this.exportReadExecutor, shadowTableExport.resource().databaseExportThreads())).forEach(rangeExportRequest -> {
            long addAndGet = this.numRowsByTable.get(rangeExportRequest.node().clientAndTableNames()).addAndGet(this.rangeExportService.exportUntypedRangeWithRetries(rangeExportRequest, ((AtomicLong) concurrentHashMap.computeIfAbsent(rangeExportRequest.node().getName(), str -> {
                return new AtomicLong();
            })).incrementAndGet()));
            if (this.numRemainingRangesByTable.get(rangeExportRequest.node().clientAndTableNames()).decrementAndGet() == 0) {
                logger.warn("exportedTable {}", new KvString().add("client", rangeExportRequest.node().clientAndTableNames().client()).add("table", rangeExportRequest.node().clientAndTableNames().table()).add("ranges", Integer.valueOf(rangeExportRequest.numRanges()), (v0) -> {
                    return NumberFormatter.addCommas(v0);
                }).add("rows", Long.valueOf(addAndGet), (v0) -> {
                    return NumberFormatter.addCommas(v0);
                }));
                this.numRowsByTable.remove(rangeExportRequest.node().clientAndTableNames());
                this.numRemainingRangesByTable.remove(rangeExportRequest.node().clientAndTableNames());
            }
        });
        Scanner map = Scanner.of(list).map(physicalSortedStorageReaderNode2 -> {
            return new ShadowTableRangeCombineService.RangeCombineRequest(shadowTableExport, newValue, physicalSortedStorageReaderNode2);
        });
        ShadowTableRangeCombineService shadowTableRangeCombineService = this.rangeCombineService;
        shadowTableRangeCombineService.getClass();
        map.forEach(shadowTableRangeCombineService::combine);
    }
}
