package io.datarouter.plugin.dataexport.service.importing;

import io.datarouter.bytes.KvString;
import io.datarouter.bytes.blockfile.io.read.BlockfileReader;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.nodewatch.util.PhysicalSortedNodeWrapper;
import io.datarouter.plugin.dataexport.config.DatarouterDataExportExecutors;
import io.datarouter.plugin.dataexport.service.blockfile.DatabeanExportBlockfileService;
import io.datarouter.plugin.dataexport.service.blockfile.DatabeanExportBlockfileStorageService;
import io.datarouter.plugin.dataexport.util.DatabeanExportFilenameTool;
import io.datarouter.plugin.dataexport.util.RateTracker;
import io.datarouter.plugin.dataexport.web.DatabeanImportHandler;
import io.datarouter.scanner.ParallelScanner;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.node.DatarouterNodes;
import io.datarouter.storage.node.op.combo.SortedMapStorage;
import io.datarouter.storage.util.Subpath;
import io.datarouter.types.Ulid;
import io.datarouter.util.Count;
import io.datarouter.util.collection.ListTool;
import io.datarouter.util.number.NumberFormatter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/datarouter/plugin/dataexport/service/importing/DatabeanImportService.class */
public class DatabeanImportService {
    private static final Logger logger = LoggerFactory.getLogger(DatabeanImportService.class);
    private static final Duration LOG_PERIOD = Duration.ofSeconds(5);
    private static final int PUT_MULTI_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
    private static final int PUT_BATCH_SIZE = 100;

    @Inject
    private DatarouterNodes datarouterNodes;

    @Inject
    private DatabeanExportBlockfileStorageService blockfileStorageService;

    @Inject
    private DatabeanExportBlockfileService blockfileService;

    @Inject
    private DatarouterDataExportExecutors.DatabeanImportPutMultiExecutor putMultiExec;

    /* loaded from: input_file:io/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse.class */
    public static final class DatabeanImportResponse extends Record {
        private final long totalDatabeans;
        private final List<String> nodeNames;

        public DatabeanImportResponse(long j, List<String> list) {
            this.totalDatabeans = j;
            this.nodeNames = list;
        }

        public long totalDatabeans() {
            return this.totalDatabeans;
        }

        public List<String> nodeNames() {
            return this.nodeNames;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DatabeanImportResponse.class), DatabeanImportResponse.class, "totalDatabeans;nodeNames", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->totalDatabeans:J", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->nodeNames:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DatabeanImportResponse.class), DatabeanImportResponse.class, "totalDatabeans;nodeNames", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->totalDatabeans:J", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->nodeNames:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DatabeanImportResponse.class, Object.class), DatabeanImportResponse.class, "totalDatabeans;nodeNames", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->totalDatabeans:J", "FIELD:Lio/datarouter/plugin/dataexport/service/importing/DatabeanImportService$DatabeanImportResponse;->nodeNames:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public DatabeanImportResponse importAllTables(Ulid ulid) {
        AtomicLong atomicLong = new AtomicLong(0L);
        return new DatabeanImportResponse(atomicLong.get(), this.blockfileStorageService.makeExportMetaDirectory(ulid).scanKeys(Subpath.empty()).map((v0) -> {
            return v0.getFile();
        }).map(DatabeanExportFilenameTool::parseClientAndTableName).map(clientAndTableName -> {
            return this.datarouterNodes.getPhysicalNodeForClientAndTable(clientAndTableName.clientName(), clientAndTableName.tableName());
        }).map((v0) -> {
            return v0.getName();
        }).each(str -> {
            atomicLong.addAndGet(importTable(ulid, str));
        }).list());
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long importTable(Ulid ulid, String str) {
        return ((Long) this.blockfileStorageService.makeTableDataDirectory(ulid, new PhysicalSortedNodeWrapper(this.datarouterNodes, str).node).scanKeys(Subpath.empty()).map(DatabeanExportFilenameTool::partId).map(num -> {
            return Long.valueOf(importPart(ulid, str, num.intValue()));
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long importPart(Ulid ulid, String str, int i) {
        PhysicalSortedNodeWrapper physicalSortedNodeWrapper = new PhysicalSortedNodeWrapper(this.datarouterNodes, str);
        return importFromBlockfile(this.blockfileService.makeBlockfileReader(this.blockfileStorageService.makeTableDataStorage(ulid, physicalSortedNodeWrapper.node), physicalSortedNodeWrapper.node, i), ulid, physicalSortedNodeWrapper.node, 100);
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long importFromBlockfile(BlockfileReader<?> blockfileReader, Ulid ulid, SortedMapStorage.PhysicalSortedMapStorageNode<PK, D, F> physicalSortedMapStorageNode, int i) {
        logger.warn("importing {}", physicalSortedMapStorageNode.getName());
        Count add = new Count.Counts().add("numDatabeans");
        AtomicReference atomicReference = new AtomicReference();
        RateTracker rateTracker = new RateTracker();
        ParallelScanner parallelUnordered = blockfileReader.sequential().scan().batch(i).parallelUnordered(new Threads(this.putMultiExec, PUT_MULTI_THREADS));
        physicalSortedMapStorageNode.getClass();
        Scanner each = parallelUnordered.each((v1) -> {
            r1.putMulti(v1);
        }).each(list -> {
            atomicReference.set(((Databean) ListTool.getLastOrNull(list)).getKey());
        });
        add.getClass();
        Scanner each2 = each.each((v1) -> {
            r1.incrementBySize(v1);
        });
        rateTracker.getClass();
        each2.each((v1) -> {
            r1.incrementBySize(v1);
        }).periodic(LOG_PERIOD, list2 -> {
            logProgress(ulid, physicalSortedMapStorageNode.getName(), add, rateTracker, ((PrimaryKey) atomicReference.get()).toString());
        }).count();
        logProgress(ulid, physicalSortedMapStorageNode.getName(), add, rateTracker, "[end]");
        return add.value();
    }

    private void logProgress(Ulid ulid, String str, Count count, RateTracker rateTracker, String str2) {
        logger.warn("imported " + String.valueOf(new KvString().add("databeans", Long.valueOf(count.value()), (v0) -> {
            return NumberFormatter.addCommas(v0);
        }).add("perSec", rateTracker.perSecDisplay()).add("perSecAvg", rateTracker.perSecAvgDisplay()).add(DatabeanImportHandler.P_exportId, ulid, (v0) -> {
            return v0.toString();
        }).add("node", str).add("through", str2)));
        rateTracker.markLogged();
    }
}
