package io.datarouter.storage.util;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.CountingInputStream;
import io.datarouter.bytes.MultiByteArrayInputStream;
import io.datarouter.bytes.kvfile.KvFileCollator;
import io.datarouter.bytes.kvfile.KvFileEntry;
import io.datarouter.bytes.kvfile.KvFileReader;
import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.scanner.ParallelScannerContext;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.file.PathbeanKey;
import io.datarouter.storage.util.KvCompactorFileCache;
import io.datarouter.storage.util.KvFileCompactor;
import io.datarouter.util.Count;
import io.datarouter.util.Require;
import io.datarouter.util.concurrent.TransferThread;
import io.datarouter.util.number.NumberFormatter;
import io.datarouter.util.tuple.Range;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/storage/util/KvFileMerger.class */
public class KvFileMerger {
    private static final Logger logger = LoggerFactory.getLogger(KvFileMerger.class);
    private final KvFileCompactor.KvFileCompactorParams params;
    private final KvCompactorFileCache.KvFileMergePlan plan;

    /* loaded from: input_file:io/datarouter/storage/util/KvFileMerger$MergeResult.class */
    public static final class MergeResult extends Record {
        private final FilenameAndSize newFile;
        private final Count recordsRead;
        private final Count bytesRead;
        private final long bytesWritten;
        private final long timeMs;
        private final Count readStallNs;
        private final Count writeStallNs;

        public MergeResult(FilenameAndSize filenameAndSize, Count count, Count count2, long j, long j2, Count count3, Count count4) {
            this.newFile = filenameAndSize;
            this.recordsRead = count;
            this.bytesRead = count2;
            this.bytesWritten = j;
            this.timeMs = j2;
            this.readStallNs = count3;
            this.writeStallNs = count4;
        }

        @Override // java.lang.Record
        public String toString() {
            return "MergeResult [newFilename=" + this.newFile.name() + ", recordsRead=" + NumberFormatter.addCommas(Long.valueOf(this.recordsRead.value())) + ", bytesRead=" + ByteLength.ofBytes(this.bytesRead.value()).toDisplay() + ", bytesWritten=" + ByteLength.ofBytes(this.bytesWritten).toDisplay() + ", timeMs=" + NumberFormatter.addCommas(Long.valueOf(this.timeMs)) + ", readStallMs=" + NumberFormatter.addCommas(Long.valueOf(this.readStallNs.value() / 1000000)) + ", writeStallMs=" + NumberFormatter.addCommas(Long.valueOf(this.writeStallNs.value() / 1000000)) + "]";
        }

        public FilenameAndSize newFile() {
            return this.newFile;
        }

        public Count recordsRead() {
            return this.recordsRead;
        }

        public Count bytesRead() {
            return this.bytesRead;
        }

        public long bytesWritten() {
            return this.bytesWritten;
        }

        public long timeMs() {
            return this.timeMs;
        }

        public Count readStallNs() {
            return this.readStallNs;
        }

        public Count writeStallNs() {
            return this.writeStallNs;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, MergeResult.class), MergeResult.class, "newFile;recordsRead;bytesRead;bytesWritten;timeMs;readStallNs;writeStallNs", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->newFile:Lio/datarouter/storage/util/FilenameAndSize;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->recordsRead:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->bytesRead:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->bytesWritten:J", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->timeMs:J", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->readStallNs:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->writeStallNs:Lio/datarouter/util/Count;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, MergeResult.class, Object.class), MergeResult.class, "newFile;recordsRead;bytesRead;bytesWritten;timeMs;readStallNs;writeStallNs", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->newFile:Lio/datarouter/storage/util/FilenameAndSize;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->recordsRead:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->bytesRead:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->bytesWritten:J", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->timeMs:J", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->readStallNs:Lio/datarouter/util/Count;", "FIELD:Lio/datarouter/storage/util/KvFileMerger$MergeResult;->writeStallNs:Lio/datarouter/util/Count;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public KvFileMerger(KvFileCompactor.KvFileCompactorParams kvFileCompactorParams, KvCompactorFileCache.KvFileMergePlan kvFileMergePlan) {
        this.params = kvFileCompactorParams;
        this.plan = kvFileMergePlan;
    }

    public MergeResult merge(TaskTracker taskTracker) {
        Count.Counts counts = new Count.Counts();
        Count add = counts.add("readStallNs");
        Count add2 = counts.add("writeStallNs");
        Count add3 = counts.add("bytesRead");
        Count add4 = counts.add("recordsRead");
        Count add5 = counts.add("messagesSubmitted");
        Function function = this.plan.streamingWrite() ? this::writeLarge : this::writeSmall;
        TransferThread.TransferThreadBuilder transferThreadBuilder = new TransferThread.TransferThreadBuilder(this.params.transferThreadParams().messageBufferSize(), scanner -> {
            return (FilenameAndSize) scanner.apply(function);
        });
        add.getClass();
        TransferThread.TransferThreadBuilder withInputStallNanosCallback = transferThreadBuilder.withInputStallNanosCallback((v1) -> {
            r1.incrementBy(v1);
        });
        add2.getClass();
        TransferThread build = withInputStallNanosCallback.withOutputStallNanosCallback((v1) -> {
            r1.incrementBy(v1);
        }).build();
        long currentTimeMillis = System.currentTimeMillis();
        Scanner batch = ((KvFileCollator) Scanner.of(this.plan.files()).parallel(new ParallelScannerContext(this.params.readerParams().prefetchEntriesExec(), this.plan.files().size(), true)).map(filenameAndSize -> {
            return makeReader(filenameAndSize, add3);
        }).listTo(KvFileCollator::new)).mergeKeepingLatestVersion().batch(this.params.transferThreadParams().messageSize());
        build.getClass();
        Scanner each = batch.each((v1) -> {
            r1.submit(v1);
        }).each(list -> {
            Require.isFalse(taskTracker.shouldStop());
        });
        add4.getClass();
        Scanner each2 = each.each((v1) -> {
            r1.incrementBySize(v1);
        });
        add5.getClass();
        each2.each((v1) -> {
            r1.increment(v1);
        }).forEach(list2 -> {
            if (add5.value() % this.params.transferThreadParams().logEveryNMessages() == 0) {
                logger.warn("intermediate merge progress: {}", counts);
            }
        });
        FilenameAndSize filenameAndSize2 = (FilenameAndSize) build.complete();
        return new MergeResult(filenameAndSize2, add4, add3, filenameAndSize2.size(), System.currentTimeMillis() - currentTimeMillis, add, add2);
    }

    private KvFileReader makeReader(FilenameAndSize filenameAndSize, Count count) {
        PathbeanKey of = PathbeanKey.of(filenameAndSize.name());
        if (filenameAndSize.size() <= this.params.mergeParams().chunkSize().toBytesInt()) {
            byte[] read = this.params.blobStorage().read(of);
            count.incrementByLength(read);
            return new KvFileReader(read);
        }
        if (this.params.mergeParams().useChunkScanner()) {
            return (KvFileReader) this.params.blobStorage().scanChunks(of, new Range<>(0L, true, Long.valueOf(filenameAndSize.size()), false), this.params.mergeParams().exec(), this.params.mergeParams().fanIn(), this.params.mergeParams().chunkSize().toBytesInt()).each(bArr -> {
                count.incrementByLength(bArr);
            }).apply(scanner -> {
                return new KvFileReader(scanner, filenameAndSize.name(), this.params.readerParams().prefetchBytesExec(), this.params.readerParams().prefetchEntriesExec(), this.params.readerParams().prefetchSize());
            });
        }
        InputStream readInputStream = this.params.blobStorage().readInputStream(of);
        int bytesInt = ByteLength.ofMiB(1L).toBytesInt();
        count.getClass();
        return new KvFileReader(new CountingInputStream(readInputStream, bytesInt, (v1) -> {
            r4.incrementBy(v1);
        }), filenameAndSize.name(), this.params.readerParams().prefetchBytesExec(), this.params.readerParams().prefetchEntriesExec(), this.params.readerParams().prefetchSize());
    }

    private FilenameAndSize writeSmall(Scanner<List<KvFileEntry>> scanner) {
        String str = this.params.filenameSupplier().get();
        this.params.blobStorage().write(PathbeanKey.of(str), (byte[]) scanner.concat((v0) -> {
            return Scanner.of(v0);
        }).map((v0) -> {
            return v0.bytes();
        }).listTo(ByteTool::concat));
        return new FilenameAndSize(str, r0.length);
    }

    private FilenameAndSize writeLarge(Scanner<List<KvFileEntry>> scanner) {
        InputStream inputStream = (InputStream) scanner.concat((v0) -> {
            return Scanner.of(v0);
        }).map((v0) -> {
            return v0.bytes();
        }).apply(MultiByteArrayInputStream::new);
        String str = this.params.filenameSupplier().get();
        PathbeanKey of = PathbeanKey.of(str);
        this.params.blobStorage().writeParallel(of, inputStream, this.params.writeParams().exec(), this.params.writeParams().numThreads());
        return new FilenameAndSize(str, this.params.blobStorage().length(of).orElseThrow().longValue());
    }
}
