package io.datarouter.bytes.blockfile.io.merge;

import io.datarouter.bytes.blockfile.block.parsed.BlockfileDecodedBlockBatch;
import io.datarouter.bytes.blockfile.io.merge.BlockfileMergerThreadsCalculator;
import io.datarouter.bytes.blockfile.io.read.BlockfileReaderBuilder;
import io.datarouter.bytes.blockfile.io.storage.BlockfileNameAndSize;
import io.datarouter.bytes.blockfile.io.write.BlockfileWriter;
import io.datarouter.bytes.blockfile.row.BlockfileRow;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

/* loaded from: input_file:io/datarouter/bytes/blockfile/io/merge/BlockfileMerger.class */
public class BlockfileMerger {
    private final BlockfileMergerParams params;
    private final BlockfileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final String filename;
    private final BlockfileMergerTracker tracker;
    private final int numVcpus = Runtime.getRuntime().availableProcessors();
    private final List<BlockfileMergerThreadsCalculator.ThreadsForFile> threadsForFileListDesc;

    public BlockfileMerger(BlockfileMergerParams blockfileMergerParams, BlockfileMergePlan blockfileMergePlan, Supplier<Boolean> supplier) {
        this.params = blockfileMergerParams;
        this.plan = blockfileMergePlan;
        this.shouldStop = supplier;
        this.filename = blockfileMergerParams.storageParams().filenameSupplier().get();
        this.tracker = new BlockfileMergerTracker(blockfileMergePlan, this.filename);
        this.threadsForFileListDesc = new BlockfileMergerThreadsCalculator(blockfileMergePlan, blockfileMergerParams.readParams()).calc();
    }

    public BlockfileNameAndSize merge() {
        this.tracker.startTime = Instant.now();
        List<Scanner<BlockfileRow>> list = Scanner.of(Scanner.of(this.plan.files()).parallelUnordered(new Threads(this.params.readParams().prefetchExec(), this.plan.files().size())).map(this::makeReader).list()).map(scanner -> {
            AtomicLong atomicLong = this.tracker.waitForBlocksNs;
            atomicLong.getClass();
            return scanner.timeNanos((v1) -> {
                r1.addAndGet(v1);
            }).concatIter((v0) -> {
                return v0.blocks();
            }).map((v0) -> {
                return v0.items();
            }).concat((v0) -> {
                return Scanner.of(v0);
            });
        }).list();
        this.tracker.logInitializationStats();
        this.tracker.resetCountersSinceLastLog();
        this.tracker.mergeStartTime = Instant.now();
        this.tracker.waitForReadersNs.addAndGet(Duration.between(this.tracker.startTime, this.tracker.mergeStartTime).toNanos());
        Scanner batchByMinSize = this.plan.collatorStrategy().method.apply(list).batchByMinSize(this.params.writeParams().minBlockSize().toBytes(), (v0) -> {
            return v0.length();
        });
        AtomicLong atomicLong = this.tracker.waitForCollatorNs;
        atomicLong.getClass();
        Scanner periodic = batchByMinSize.timeNanos((v1) -> {
            r1.addAndGet(v1);
        }).each(list2 -> {
            this.tracker.blocksWritten.incrementAndGet();
            this.tracker.blocksWrittenSinceLastLog.incrementAndGet();
            this.tracker.recordsWritten.addAndGet(list2.size());
            this.tracker.recordsWrittenSinceLastLog.addAndGet(list2.size());
        }).periodic(this.params.heartbeatPeriod(), list3 -> {
            this.tracker.logIntermediateProgress();
            this.tracker.resetCountersSinceLastLog();
            throwIfShouldStop();
        });
        BlockfileWriter<BlockfileRow> makeWriter = makeWriter(this.filename);
        makeWriter.getClass();
        BlockfileNameAndSize blockfileNameAndSize = new BlockfileNameAndSize(this.filename, ((BlockfileWriter.BlockfileWriteResult) periodic.apply(makeWriter::writeBlocks)).fileLength().toBytes());
        this.tracker.logProgress(true, blockfileNameAndSize);
        return blockfileNameAndSize;
    }

    private Scanner<BlockfileDecodedBlockBatch<BlockfileRow>> makeReader(BlockfileNameAndSize blockfileNameAndSize) {
        int intValue = ((Integer) Scanner.of(this.threadsForFileListDesc).include(threadsForFile -> {
            return threadsForFile.file().equals(blockfileNameAndSize);
        }).map((v0) -> {
            return v0.threads();
        }).findFirst().orElseThrow()).intValue();
        BlockfileReaderBuilder<BlockfileRow> decodeThreads = this.params.storageParams().blockfileGroup().newReaderBuilderKnownFileLength(blockfileNameAndSize.name(), blockfileNameAndSize.size(), Function.identity()).setReadChunkSize(this.params.readParams().readChunkSize()).setDecodeBatchSize(this.params.readParams().decodeBatchSize()).setDecodeThreads(new Threads(this.params.readParams().readExec(), this.numVcpus));
        if (intValue > 0) {
            decodeThreads.setReadThreads(Threads.useExecForSingleThread(this.params.readParams().readExec(), intValue));
        }
        return decodeThreads.build().sequential().scanDecodedBlockBatches().each(blockfileDecodedBlockBatch -> {
            this.tracker.blocksRead.addAndGet(blockfileDecodedBlockBatch.blocks().size());
            this.tracker.blocksReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.blocks().size());
            this.tracker.compressedBytesRead.addAndGet(blockfileDecodedBlockBatch.totalCompressedSize());
            this.tracker.compressedBytesReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.totalCompressedSize());
            this.tracker.decompressedBytesRead.addAndGet(blockfileDecodedBlockBatch.totalDecompressedSize());
            this.tracker.decompressedBytesReadSinceLastLog.addAndGet(blockfileDecodedBlockBatch.totalDecompressedSize());
            blockfileDecodedBlockBatch.blocks().forEach(blockfileDecodedBlock -> {
                this.tracker.recordsRead.addAndGet(blockfileDecodedBlock.items().size());
                this.tracker.recordsReadSinceLastLog.addAndGet(blockfileDecodedBlock.items().size());
            });
        }).peekFirst(blockfileDecodedBlockBatch2 -> {
        });
    }

    private BlockfileWriter<BlockfileRow> makeWriter(String str) {
        return this.params.storageParams().blockfileGroup().newWriterBuilder(str).setValueBlockFormat(this.params.writeParams().valueBlockFormat()).setIndexBlockFormat(this.params.writeParams().indexBlockFormat()).setCompressor(this.params.writeParams().compressor()).setEncodeBatchSize(this.params.writeParams().encodeBatchSize()).setEncodeThreads(new Threads(this.params.writeParams().encodeExec(), this.numVcpus)).setMultipartWrite(this.plan.totalInputSize().toBytes() > this.params.writeParams().multipartUploadThreshold().toBytes()).setWriteThreads(new Threads(this.params.writeParams().writeExec(), this.params.writeParams().writeThreads())).build();
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }
}
