package io.datarouter.nodewatch.shadowtable.service;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.KvString;
import io.datarouter.bytes.blockfile.row.BlockfileRow;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.nodewatch.shadowtable.ShadowTableExport;
import io.datarouter.nodewatch.shadowtable.ShadowTableMetrics;
import io.datarouter.nodewatch.shadowtable.config.DatarouterShadowTableExecutors;
import io.datarouter.nodewatch.shadowtable.config.DatarouterShadowTableSettingRoot;
import io.datarouter.nodewatch.shadowtable.storage.ShadowTableBlockfileDao;
import io.datarouter.nodewatch.shadowtable.storage.ShadowTableRangeBlockfileDao;
import io.datarouter.scanner.BatchByMinSizeScanner;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.node.op.raw.read.SortedStorageReader;
import io.datarouter.util.Count;
import io.datarouter.util.number.NumberFormatter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService.class */
public class ShadowTableRangeCombineService {
    private static final int MAX_CONSIDERED_VCPUS = 16;
    private static final int PREFETCH_GROUPS_PER_VCPU = 4;

    @Inject
    private DatarouterShadowTableSettingRoot settings;

    @Inject
    private ShadowTableNodeSelectionService nodeSelectionService;

    @Inject
    private ShadowTableRangeBlockfileDao rangeBlockfileDao;

    @Inject
    private ShadowTableBlockfileDao blockfileDao;

    @Inject
    private DatarouterShadowTableExecutors.ShadowTableCombinePrefetchExecutor combinePrefetchExec;
    private static final Logger logger = LoggerFactory.getLogger(ShadowTableRangeCombineService.class);
    private static final ByteLength BLOB_PREFETCHER_BUFFER_SIZE_PER_VCPU = ByteLength.ofMiB(256);
    private static final ByteLength PREFETCH_GROUP_SIZE = ByteLength.ofMiB(16);
    private static final ByteLength OUTPUT_BLOCK_SIZE = ByteLength.ofKiB(32);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats.class */
    public static final class BlockOfRowsWithStats extends Record {
        private final List<BlockfileRow> rows;
        private final long totalBytes;

        private BlockOfRowsWithStats(List<BlockfileRow> list, long j) {
            this.rows = list;
            this.totalBytes = j;
        }

        private BlockOfRowsWithStats(BatchByMinSizeScanner.ScannerMinSizeBatch<BlockfileRow> scannerMinSizeBatch) {
            this(scannerMinSizeBatch.items(), totalBytes(scannerMinSizeBatch));
        }

        private static final long totalBytes(BatchByMinSizeScanner.ScannerMinSizeBatch<BlockfileRow> scannerMinSizeBatch) {
            return scannerMinSizeBatch.totalSize() + (84 * scannerMinSizeBatch.items().size());
        }

        public List<BlockfileRow> rows() {
            return this.rows;
        }

        public long totalBytes() {
            return this.totalBytes;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlockOfRowsWithStats.class), BlockOfRowsWithStats.class, "rows;totalBytes", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->rows:Ljava/util/List;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->totalBytes:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlockOfRowsWithStats.class), BlockOfRowsWithStats.class, "rows;totalBytes", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->rows:Ljava/util/List;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->totalBytes:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlockOfRowsWithStats.class, Object.class), BlockOfRowsWithStats.class, "rows;totalBytes", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->rows:Ljava/util/List;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$BlockOfRowsWithStats;->totalBytes:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest.class */
    public static final class RangeCombineRequest<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> extends Record {
        private final ShadowTableExport export;
        private final String exportId;
        private final SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> node;

        public RangeCombineRequest(ShadowTableExport shadowTableExport, String str, SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> physicalSortedStorageReaderNode) {
            this.export = shadowTableExport;
            this.exportId = str;
            this.node = physicalSortedStorageReaderNode;
        }

        public ShadowTableExport export() {
            return this.export;
        }

        public String exportId() {
            return this.exportId;
        }

        public SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> node() {
            return this.node;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RangeCombineRequest.class), RangeCombineRequest.class, "export;exportId;node", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RangeCombineRequest.class), RangeCombineRequest.class, "export;exportId;node", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RangeCombineRequest.class, Object.class), RangeCombineRequest.class, "export;exportId;node", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeCombineService$RangeCombineRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> void combine(RangeCombineRequest<PK, D, F> rangeCombineRequest) {
        long numFiles = this.rangeBlockfileDao.numFiles(rangeCombineRequest.export(), rangeCombineRequest.exportId(), rangeCombineRequest.node().clientAndTableNames().table());
        int min = Math.min(rangeCombineRequest.export().resource().vcpus(), MAX_CONSIDERED_VCPUS);
        ByteLength ofBytes = ByteLength.ofBytes(BLOB_PREFETCHER_BUFFER_SIZE_PER_VCPU.toBytes() * min);
        int i = PREFETCH_GROUPS_PER_VCPU * min;
        Count.Counts counts = new Count.Counts();
        Count add = counts.add("rows");
        Count add2 = counts.add("waitOnReadNs");
        Count add3 = counts.add("prefetchedBlockTally");
        Supplier supplier = () -> {
            return new KvString().add("client", rangeCombineRequest.node().clientAndTableNames().client()).add("table", rangeCombineRequest.node().clientAndTableNames().table()).add("rows", add.valueToString()).add("readStallMs", Long.valueOf(Duration.ofNanos(add2.value()).toMillis()), (v0) -> {
                return NumberFormatter.addCommas(v0);
            }).add("prefetchedBlocks", add3.valueToString());
        };
        logger.warn("startCombineRanges {}", ((KvString) supplier.get()).add("totalRanges", Long.valueOf(numFiles), (v0) -> {
            return NumberFormatter.addCommas(v0);
        }));
        Scanner each = (((Boolean) this.settings.useBlobPrefetcher.get()).booleanValue() ? this.rangeBlockfileDao.scanConcatenatedRangeRowsWithBlobPrefetcher(rangeCombineRequest.export(), rangeCombineRequest.exportId(), rangeCombineRequest.node().clientAndTableNames().table(), ofBytes) : this.rangeBlockfileDao.scanConcatenatedRangeRows(rangeCombineRequest.export(), rangeCombineRequest.exportId(), rangeCombineRequest.node().clientAndTableNames().table())).batchByMinSizeWithStats(OUTPUT_BLOCK_SIZE.toBytes(), (v0) -> {
            return v0.length();
        }).map(scannerMinSizeBatch -> {
            return new BlockOfRowsWithStats(scannerMinSizeBatch);
        }).batchByMinSize(PREFETCH_GROUP_SIZE.toBytes(), (v0) -> {
            return v0.totalBytes();
        }).each(list -> {
            long sum = list.stream().map((v0) -> {
                return v0.rows();
            }).mapToLong((v0) -> {
                return v0.size();
            }).sum();
            long sum2 = list.stream().mapToLong((v0) -> {
                return v0.totalBytes();
            }).sum();
            long size = list.size();
            add.incrementBy(sum);
            ShadowTableMetrics.countCombineRows(rangeCombineRequest.export.name(), rangeCombineRequest.node.clientAndTableNames(), sum);
            ShadowTableMetrics.countCombineBytesIn(rangeCombineRequest.export.name(), rangeCombineRequest.node.clientAndTableNames(), sum2);
            ShadowTableMetrics.countCombineBlocksOut(rangeCombineRequest.export.name(), rangeCombineRequest.node.clientAndTableNames(), size);
            ShadowTableMetrics.measureCombineBlocksPrefetched(rangeCombineRequest.export.name(), rangeCombineRequest.node.clientAndTableNames(), add3.value());
        }).each(list2 -> {
            add3.incrementBy(list2.size());
        }).prefetch(this.combinePrefetchExec, i).each(list3 -> {
            add3.decrementBy(list3.size());
        });
        add2.getClass();
        this.blockfileDao.writeBlockfile(rangeCombineRequest.export(), rangeCombineRequest.exportId(), rangeCombineRequest.node(), this.nodeSelectionService.enableCompression(((RangeCombineRequest) rangeCombineRequest).node), each.timeNanos((v1) -> {
            r1.incrementBy(v1);
        }).periodic(Duration.ofSeconds(5L), list4 -> {
            logger.warn("combineRanges {}", supplier.get());
        }).concat((v0) -> {
            return Scanner.of(v0);
        }).map((v0) -> {
            return v0.rows();
        }));
        this.rangeBlockfileDao.deleteRangeFiles(rangeCombineRequest.export(), rangeCombineRequest.exportId(), rangeCombineRequest.node().clientAndTableNames().table());
        logger.warn("finishCombineRanges {}", supplier.get());
    }
}
