package io.datarouter.bytes.kvfile.merge;

import io.datarouter.bytes.blockfile.Blockfile;
import io.datarouter.bytes.blockfile.dto.BlockfileNameAndSize;
import io.datarouter.bytes.blockfile.read.BlockfileReader;
import io.datarouter.bytes.blockfile.read.BlockfileReaderBuilder;
import io.datarouter.bytes.blockfile.write.BlockfileWriter;
import io.datarouter.bytes.kvfile.block.KvFileBlockCodec;
import io.datarouter.bytes.kvfile.kv.KvFileEntry;
import io.datarouter.bytes.kvfile.merge.KvFileMergerThreadsCalculator;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/* loaded from: input_file:io/datarouter/bytes/kvfile/merge/KvFileMerger.class */
public class KvFileMerger {
    private final KvFileMergerParams params;
    private final KvFileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final String filename;
    private final KvFileMergerTracker tracker;
    private final int numVcpus = Runtime.getRuntime().availableProcessors();
    private final List<KvFileMergerThreadsCalculator.ThreadsForFile> threadsForFileListDesc;

    public KvFileMerger(KvFileMergerParams kvFileMergerParams, KvFileMergePlan kvFileMergePlan, Supplier<Boolean> supplier) {
        this.params = kvFileMergerParams;
        this.plan = kvFileMergePlan;
        this.shouldStop = supplier;
        this.filename = kvFileMergerParams.storageParams().filenameSupplier().get();
        this.tracker = new KvFileMergerTracker(kvFileMergePlan, this.filename);
        this.threadsForFileListDesc = new KvFileMergerThreadsCalculator(kvFileMergePlan, kvFileMergerParams.readParams()).calc();
    }

    public BlockfileNameAndSize merge() {
        this.tracker.startTime = Instant.now();
        List<Scanner<KvFileEntry>> list = Scanner.of(Scanner.of(this.plan.files()).parallelUnordered(new Threads(this.params.readParams().prefetchExec(), this.plan.files().size())).map(this::makeReader).list()).map(scanner -> {
            AtomicLong atomicLong = this.tracker.waitForBlocksNs;
            atomicLong.getClass();
            return scanner.timeNanos((v1) -> {
                r1.addAndGet(v1);
            }).concatIter((v0) -> {
                return v0.blocks();
            }).map((v0) -> {
                return v0.value();
            }).concat((v0) -> {
                return Scanner.of(v0);
            });
        }).list();
        this.tracker.logInitializationStats();
        this.tracker.resetCountersSinceLastLog();
        this.tracker.mergeStartTime = Instant.now();
        this.tracker.waitForReadersNs.addAndGet(Duration.between(this.tracker.startTime, this.tracker.mergeStartTime).toNanos());
        Scanner batchByMinSize = this.plan.collatorStrategy().method.apply(list).batchByMinSize(this.params.writeParams().minBlockSize().toBytes(), (v0) -> {
            return v0.length();
        });
        AtomicLong atomicLong = this.tracker.waitForCollatorNs;
        atomicLong.getClass();
        Scanner periodic = batchByMinSize.timeNanos((v1) -> {
            r1.addAndGet(v1);
        }).each(list2 -> {
            this.tracker.blocksWritten.incrementAndGet();
            this.tracker.blocksWrittenSinceLastLog.incrementAndGet();
            this.tracker.recordsWritten.addAndGet(list2.size());
            this.tracker.recordsWrittenSinceLastLog.addAndGet(list2.size());
        }).periodic(this.params.heartbeatPeriod(), list3 -> {
            this.tracker.logIntermediateProgress();
            this.tracker.resetCountersSinceLastLog();
            throwIfShouldStop();
        });
        BlockfileWriter<List<KvFileEntry>> makeWriter = makeWriter(this.filename);
        makeWriter.getClass();
        BlockfileNameAndSize blockfileNameAndSize = new BlockfileNameAndSize(this.filename, ((BlockfileWriter.BlockfileWriteResult) periodic.apply(makeWriter::write)).fileLength().toBytes());
        this.tracker.logProgress(true, blockfileNameAndSize);
        return blockfileNameAndSize;
    }

    private Scanner<BlockfileReader.BlockfileDecodedBlockBatch<List<KvFileEntry>>> makeReader(BlockfileNameAndSize blockfileNameAndSize) {
        int intValue = ((Integer) Scanner.of(this.threadsForFileListDesc).include(threadsForFile -> {
            return threadsForFile.file().equals(blockfileNameAndSize);
        }).map((v0) -> {
            return v0.threads();
        }).findFirst().orElseThrow()).intValue();
        BlockfileReaderBuilder<List<KvFileEntry>> decodeThreads = this.params.storageParams().blockfile().newReaderBuilder(this.params.storageParams().blockfile().newMetadataReaderBuilder(blockfileNameAndSize.name()).setKnownFileLength(blockfileNameAndSize.size()).build(), blockfileReader -> {
            KvFileBlockCodec<KvFileEntry> newBlockCodec = this.params.blockFormat().newBlockCodec();
            newBlockCodec.getClass();
            return newBlockCodec::decodeAll;
        }).setReadChunkSize(this.params.readParams().readChunkSize()).setDecodeBatchSize(this.params.readParams().decodeBatchSize()).setDecodeThreads(new Threads(this.params.readParams().readExec(), this.numVcpus));
        if (intValue > 0) {
            decodeThreads.setReadThreads(Threads.useExecForSingleThread(this.params.readParams().readExec(), intValue));
        }
        return decodeThreads.build().scanDecodedBlockBatches().each(blockfileDecodedBlockBatch -> {
            this.tracker.blocksRead.addAndGet(blockfileDecodedBlockBatch.blocks().size());
            this.tracker.blocksReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.blocks().size());
            this.tracker.compressedBytesRead.addAndGet(blockfileDecodedBlockBatch.totalCompressedSize());
            this.tracker.compressedBytesReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.totalCompressedSize());
            this.tracker.decompressedBytesRead.addAndGet(blockfileDecodedBlockBatch.totalDecompressedSize());
            this.tracker.decompressedBytesReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.totalDecompressedSize());
            blockfileDecodedBlockBatch.blocks().forEach(blockfileDecodedBlock -> {
                this.tracker.recordsRead.addAndGet(((List) blockfileDecodedBlock.value()).size());
                this.tracker.recordsReadSinceLastLog.addAndGet(((List) blockfileDecodedBlock.value()).size());
            });
        }).peekFirst(blockfileDecodedBlockBatch2 -> {
        });
    }

    private BlockfileWriter<List<KvFileEntry>> makeWriter(String str) {
        boolean z = this.plan.totalInputSize().toBytes() > this.params.writeParams().multipartUploadThreshold().toBytes();
        Threads threads = new Threads(this.params.writeParams().writeExec(), this.params.writeParams().writeThreads());
        Blockfile<List<KvFileEntry>> blockfile = this.params.storageParams().blockfile();
        KvFileBlockCodec<KvFileEntry> newBlockCodec = this.params.blockFormat().newBlockCodec();
        newBlockCodec.getClass();
        return blockfile.newWriterBuilder(str, newBlockCodec::encodeAll).setCompressor(this.params.writeParams().compressor()).setEncodeBatchSize(this.params.writeParams().encodeBatchSize()).setEncodeThreads(new Threads(this.params.writeParams().encodeExec(), this.numVcpus)).setMultipartWrite(z).setWriteThreads(threads).build();
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }
}
