package io.datarouter.nodewatch.shadowtable.service;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.KvString;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.nodewatch.service.TableSamplerService;
import io.datarouter.nodewatch.shadowtable.ShadowTableExport;
import io.datarouter.nodewatch.shadowtable.ShadowTableMetrics;
import io.datarouter.nodewatch.shadowtable.codec.ShadowTableStatefulDictionaryCodec;
import io.datarouter.nodewatch.shadowtable.config.DatarouterShadowTableSettingRoot;
import io.datarouter.nodewatch.shadowtable.storage.ShadowTableRangeBlockfileDao;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.node.op.raw.read.SortedStorageReader;
import io.datarouter.util.Require;
import io.datarouter.util.duration.DatarouterDuration;
import io.datarouter.util.number.NumberFormatter;
import io.datarouter.util.tuple.Range;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService.class */
public class ShadowTableRangeExportService {
    private static final Logger logger = LoggerFactory.getLogger(ShadowTableRangeExportService.class);
    private static final Duration GROUP_SAMPLES_BY_COUNTING_TIME = Duration.ofSeconds(10);
    private static final Duration SLOW_RANGE_THRESHOLD = Duration.ofMinutes(2);

    @Inject
    private DatarouterShadowTableSettingRoot settingRoot;

    @Inject
    private ShadowTableNodeSelectionService nodeSelectionService;

    @Inject
    private TableSamplerService tableSamplerService;

    @Inject
    private ShadowTableRangeBlockfileDao rangeBlockfileDao;

    /* loaded from: input_file:io/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest.class */
    public static final class RangeExportRequest<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> extends Record {
        private final ShadowTableExport export;
        private final String exportId;
        private final SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> node;
        private final Range<PK> range;
        private final int rangeIdOneBased;
        private final int numRanges;

        public RangeExportRequest(ShadowTableExport shadowTableExport, String str, SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> physicalSortedStorageReaderNode, Range<PK> range, int i, int i2) {
            this.export = shadowTableExport;
            this.exportId = str;
            this.node = physicalSortedStorageReaderNode;
            this.range = range;
            this.rangeIdOneBased = i;
            this.numRanges = i2;
        }

        public ShadowTableExport export() {
            return this.export;
        }

        public String exportId() {
            return this.exportId;
        }

        public SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> node() {
            return this.node;
        }

        public Range<PK> range() {
            return this.range;
        }

        public int rangeIdOneBased() {
            return this.rangeIdOneBased;
        }

        public int numRanges() {
            return this.numRanges;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RangeExportRequest.class), RangeExportRequest.class, "export;exportId;node;range;rangeIdOneBased;numRanges", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->range:Lio/datarouter/util/tuple/Range;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->rangeIdOneBased:I", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->numRanges:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RangeExportRequest.class), RangeExportRequest.class, "export;exportId;node;range;rangeIdOneBased;numRanges", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->range:Lio/datarouter/util/tuple/Range;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->rangeIdOneBased:I", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->numRanges:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RangeExportRequest.class, Object.class), RangeExportRequest.class, "export;exportId;node;range;rangeIdOneBased;numRanges", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->export:Lio/datarouter/nodewatch/shadowtable/ShadowTableExport;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->exportId:Ljava/lang/String;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->node:Lio/datarouter/storage/node/op/raw/read/SortedStorageReader$PhysicalSortedStorageReaderNode;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->range:Lio/datarouter/util/tuple/Range;", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->rangeIdOneBased:I", "FIELD:Lio/datarouter/nodewatch/shadowtable/service/ShadowTableRangeExportService$RangeExportRequest;->numRanges:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> List<RangeExportRequest<PK, D, F>> makeRangeExportRequests(ShadowTableExport shadowTableExport, String str, SortedStorageReader.PhysicalSortedStorageReaderNode<PK, D, F> physicalSortedStorageReaderNode) {
        List list = this.tableSamplerService.scanSampledPkRangesByCountingTime(physicalSortedStorageReaderNode, GROUP_SAMPLES_BY_COUNTING_TIME).list();
        AtomicInteger atomicInteger = new AtomicInteger();
        return Scanner.of(list).map(range -> {
            return new RangeExportRequest(shadowTableExport, str, physicalSortedStorageReaderNode, range, atomicInteger.incrementAndGet(), list.size());
        }).shuffle().list();
    }

    public <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long exportUntypedRangeWithRetries(RangeExportRequest<?, ?, ?> rangeExportRequest, long j) {
        return exportRangeWithRetries(rangeExportRequest, j);
    }

    private <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long exportRangeWithRetries(RangeExportRequest<PK, D, F> rangeExportRequest, long j) {
        Supplier supplier = () -> {
            return new KvString().add("client", rangeExportRequest.node.clientAndTableNames().client()).add("table", rangeExportRequest.node.clientAndTableNames().table()).add("sequenceId", NumberFormatter.addCommas(Long.valueOf(j)) + "/" + NumberFormatter.addCommas(Integer.valueOf(rangeExportRequest.numRanges))).add("id", NumberFormatter.addCommas(Integer.valueOf(rangeExportRequest.rangeIdOneBased)));
        };
        RuntimeException runtimeException = null;
        for (int i = 1; i <= 3; i++) {
            try {
                return exportRange(rangeExportRequest, j, i);
            } catch (RuntimeException e) {
                runtimeException = e;
                logger.warn("RangeFailure " + String.valueOf(((KvString) supplier.get()).add("attempt", "%s/%s".formatted(Integer.valueOf(i), 3))), e);
            }
        }
        throw new RuntimeException("Failure " + String.valueOf(((KvString) supplier.get()).add("attempts", 3, (v0) -> {
            return NumberFormatter.addCommas(v0);
        })), runtimeException);
    }

    private <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> long exportRange(RangeExportRequest<PK, D, F> rangeExportRequest, long j, int i) {
        Instant now = Instant.now();
        Supplier supplier = () -> {
            return Duration.between(now, Instant.now());
        };
        ShadowTableStatefulDictionaryCodec shadowTableStatefulDictionaryCodec = new ShadowTableStatefulDictionaryCodec(((RangeExportRequest) rangeExportRequest).node.getFieldInfo(), ShadowTableStatefulDictionaryCodec.ColumnNameCodec.createNewMappings(((RangeExportRequest) rangeExportRequest).node.getFieldInfo()));
        boolean enableCompression = this.nodeSelectionService.enableCompression(((RangeExportRequest) rangeExportRequest).node);
        Config responseBatchSize = new Config().setResponseBatchSize(Integer.valueOf(this.nodeSelectionService.scanBatchSizeForNode(((RangeExportRequest) rangeExportRequest).node)));
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        Supplier supplier2 = () -> {
            return new KvString().add("client", rangeExportRequest.node.clientAndTableNames().client()).add("table", rangeExportRequest.node.clientAndTableNames().table()).add("sequenceId", NumberFormatter.addCommas(Long.valueOf(j)) + "/" + NumberFormatter.addCommas(Integer.valueOf(rangeExportRequest.numRanges))).add("id", NumberFormatter.addCommas(Integer.valueOf(rangeExportRequest.rangeIdOneBased))).add("rows", Long.valueOf(atomicLong.get()), (v0) -> {
                return NumberFormatter.addCommas(v0);
            }).add("blockfileRowBytes", ByteLength.ofBytes(atomicLong2.get()).toDisplay()).add("duration", new DatarouterDuration((Duration) supplier.get()).toString()).add("attempt", Integer.valueOf(i), (v0) -> {
                return NumberFormatter.addCommas(v0);
            }).add("range", rangeExportRequest.range, (v0) -> {
                return v0.toString();
            });
        };
        Scanner scan = ((RangeExportRequest) rangeExportRequest).node.scan(((RangeExportRequest) rangeExportRequest).range, responseBatchSize);
        shadowTableStatefulDictionaryCodec.getClass();
        scan.map(shadowTableStatefulDictionaryCodec::encode).batchByMinSizeWithStats(ShadowTableRangeBlockfileDao.BLOCK_SIZE.toBytes(), (v0) -> {
            return v0.length();
        }).each(scannerMinSizeBatch -> {
            Require.isTrue(((Boolean) this.settingRoot.runExports.get()).booleanValue(), "Export setting was disabled");
            atomicLong.addAndGet(scannerMinSizeBatch.items().size());
            ShadowTableMetrics.countExportRows(rangeExportRequest.export.name(), rangeExportRequest.node.clientAndTableNames(), scannerMinSizeBatch.items().size());
            atomicLong2.addAndGet(scannerMinSizeBatch.totalSize());
            ShadowTableMetrics.countBlockfileInputBytes(rangeExportRequest.export.name(), rangeExportRequest.node.clientAndTableNames(), scannerMinSizeBatch.totalSize());
        }).periodic(SLOW_RANGE_THRESHOLD, scannerMinSizeBatch2 -> {
            logger.warn("slowRange {}", supplier2.get());
        }).map((v0) -> {
            return v0.items();
        }).then(scanner -> {
            this.rangeBlockfileDao.writeBlockfile(rangeExportRequest.export, rangeExportRequest.exportId, rangeExportRequest.node, enableCompression, rangeExportRequest.rangeIdOneBased, rangeExportRequest.numRanges, scanner);
        });
        logger.info("exportedRange {}", supplier2.get());
        return atomicLong.get();
    }
}
