package io.datarouter.storage.scratch.blockfile;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.Codec;
import io.datarouter.bytes.blockfile.BlockfileGroup;
import io.datarouter.bytes.blockfile.BlockfileGroupBuilder;
import io.datarouter.bytes.blockfile.encoding.compression.BlockfileCompressor;
import io.datarouter.bytes.blockfile.encoding.compression.BlockfileStandardCompressors;
import io.datarouter.bytes.blockfile.io.write.BlockfileWriter;
import io.datarouter.bytes.blockfile.row.BlockfileRow;
import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.file.Directory;
import io.datarouter.storage.node.tableconfig.NodewatchConfigurationBuilder;
import io.datarouter.storage.scratch.ScratchDatabeanCodec;
import io.datarouter.storage.serialize.fieldcache.DatabeanFieldInfo;
import io.datarouter.storage.serialize.fieldcache.IndexEntryFieldInfo;
import io.datarouter.storage.util.BlockfileDirectoryStorage;
import io.datarouter.util.number.NumberFormatter;
import io.datarouter.util.string.StringTool;
import io.datarouter.util.tuple.Range;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/* loaded from: input_file:io/datarouter/storage/scratch/blockfile/ScratchDatabeanBlockfileManager.class */
public class ScratchDatabeanBlockfileManager<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> {
    private final String name;
    private final BlockfileGroup<D> blockfileGroup;
    private final BlockfileCompressor compressor;
    private final Codec<D, BlockfileRow> codec;
    private final Function<Range<PK>, Scanner<D>> inputScannerSupplier;
    private final Threads writeThreads;
    private final Threads readThreads;
    private final Threads decodeThreads;
    private final Optional<TaskTracker> optTracker;

    /* loaded from: input_file:io/datarouter/storage/scratch/blockfile/ScratchDatabeanBlockfileManager$ScratchDatabeanBlockfileManagerBuilder.class */
    public static class ScratchDatabeanBlockfileManagerBuilder<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> {
        private final String name;
        private final Directory directory;
        private final Codec<D, ScratchDatabeanCodec.ScratchDatabeanBytes> databeanBytesCodec;
        private final Function<Range<PK>, Scanner<D>> inputScannerSupplier;
        private BlockfileCompressor compressor = BlockfileStandardCompressors.NONE;
        private Threads writeThreads = Threads.none();
        private Threads readThreads = Threads.none();
        private Threads decodeThreads = Threads.none();
        private TaskTracker tracker;

        public ScratchDatabeanBlockfileManagerBuilder(String str, Directory directory, Codec<D, ScratchDatabeanCodec.ScratchDatabeanBytes> codec, Function<Range<PK>, Scanner<D>> function) {
            this.name = str;
            this.directory = directory;
            this.databeanBytesCodec = codec;
            this.inputScannerSupplier = function;
        }

        public static <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> ScratchDatabeanBlockfileManagerBuilder<PK, D, F> fromDatabeanFieldInfo(String str, Directory directory, DatabeanFieldInfo<PK, D, F> databeanFieldInfo, Function<Range<PK>, Scanner<D>> function) {
            return new ScratchDatabeanBlockfileManagerBuilder<>(str, directory, new ScratchDatabeanCodec(databeanFieldInfo), function);
        }

        public static <PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> ScratchDatabeanBlockfileManagerBuilder<PK, D, F> fromIndexEntryFieldInfo(String str, Directory directory, IndexEntryFieldInfo<PK, D, F> indexEntryFieldInfo, Function<Range<PK>, Scanner<D>> function) {
            return new ScratchDatabeanBlockfileManagerBuilder<>(str, directory, new ScratchDatabeanCodec(indexEntryFieldInfo), function);
        }

        public ScratchDatabeanBlockfileManagerBuilder<PK, D, F> setCompressor(BlockfileCompressor blockfileCompressor) {
            this.compressor = blockfileCompressor;
            return this;
        }

        public ScratchDatabeanBlockfileManagerBuilder<PK, D, F> setWriteThreads(Threads threads) {
            this.writeThreads = threads;
            return this;
        }

        public ScratchDatabeanBlockfileManagerBuilder<PK, D, F> setReadThreads(Threads threads) {
            this.readThreads = threads;
            return this;
        }

        public ScratchDatabeanBlockfileManagerBuilder<PK, D, F> setDecodeThreads(Threads threads) {
            this.decodeThreads = threads;
            return this;
        }

        public ScratchDatabeanBlockfileManagerBuilder<PK, D, F> setTaskTracker(TaskTracker taskTracker) {
            this.tracker = taskTracker;
            return this;
        }

        public ScratchDatabeanBlockfileManager<PK, D, F> build() {
            return new ScratchDatabeanBlockfileManager<>(this.name, new BlockfileGroupBuilder(new BlockfileDirectoryStorage(this.directory)).build(), this.compressor, new ScratchDatabeanBlockfileCodec(this.databeanBytesCodec), this.inputScannerSupplier, this.writeThreads, this.readThreads, this.decodeThreads, Optional.ofNullable(this.tracker));
        }
    }

    private ScratchDatabeanBlockfileManager(String str, BlockfileGroup<D> blockfileGroup, BlockfileCompressor blockfileCompressor, Codec<D, BlockfileRow> codec, Function<Range<PK>, Scanner<D>> function, Threads threads, Threads threads2, Threads threads3, Optional<TaskTracker> optional) {
        this.name = str;
        this.blockfileGroup = blockfileGroup;
        this.compressor = blockfileCompressor;
        this.codec = codec;
        this.inputScannerSupplier = function;
        this.writeThreads = threads;
        this.readThreads = threads2;
        this.decodeThreads = threads3;
        this.optTracker = optional;
    }

    public void writeBlockfiles(List<Range<PK>> list) {
        AtomicLong atomicLong = new AtomicLong();
        Scanner.iterate(0, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit(list.size()).parallelUnordered(this.writeThreads).forEach(num2 -> {
            writeBlockfile(num2.intValue(), list.size(), (Range) list.get(num2.intValue()), atomicLong);
        });
    }

    private void writeBlockfile(long j, long j2, Range<PK> range, AtomicLong atomicLong) {
        BlockfileWriter build = this.blockfileGroup.newWriterBuilder(makeFilename(j + 1, j2)).setCompressor(this.compressor).build();
        AtomicLong atomicLong2 = new AtomicLong();
        Scanner<D> apply = this.inputScannerSupplier.apply(range);
        Codec<D, BlockfileRow> codec = this.codec;
        codec.getClass();
        build.writeRows(ByteLength.ofKiB(16L), apply.map((v1) -> {
            return r1.encode(v1);
        }).batch(NodewatchConfigurationBuilder.DEFAULT_BATCH_SIZE).each(list -> {
            atomicLong2.addAndGet(list.size());
        }).periodic(Duration.ofSeconds(1L), list2 -> {
            this.optTracker.ifPresent(taskTracker -> {
                taskTracker.increment(atomicLong2.get()).heartbeat();
                atomicLong2.set(0L);
            });
        }).concat((v0) -> {
            return Scanner.of(v0);
        }));
        atomicLong.incrementAndGet();
        this.optTracker.ifPresent(taskTracker -> {
            taskTracker.setLastItemProcessed(String.format("%s blockfile %s of %s", this.name, NumberFormatter.addCommas(Long.valueOf(atomicLong.get())), NumberFormatter.addCommas(Long.valueOf(j2))));
        });
    }

    public Scanner<D> scanBlockfiles() {
        return Scanner.of(this.blockfileGroup.storage().list()).map((v0) -> {
            return v0.name();
        }).concat(this::scanBlockfile);
    }

    private Scanner<D> scanBlockfile(String str) {
        BlockfileGroup<D> blockfileGroup = this.blockfileGroup;
        Codec<D, BlockfileRow> codec = this.codec;
        codec.getClass();
        return blockfileGroup.newReaderBuilder(str, (v1) -> {
            return r2.decode(v1);
        }).setReadThreads(this.readThreads).setDecodeThreads(this.decodeThreads).setReadChunkSize(ByteLength.ofMiB(16L)).build().sequential().scan();
    }

    private static String makeFilename(long j, long j2) {
        return StringTool.pad(Long.toString(j), '0', Long.toString(j2).length()) + "-of-" + j2;
    }
}
