package io.datarouter.bytes.blockfile.write;

import io.datarouter.bytes.BinaryDictionary;
import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.Codec;
import io.datarouter.bytes.blockfile.checksum.BlockfileChecksummer;
import io.datarouter.bytes.blockfile.compress.BlockfileCompressor;
import io.datarouter.bytes.blockfile.dto.BlockfileTokens;
import io.datarouter.bytes.blockfile.dto.tokens.BlockfileBlockTokens;
import io.datarouter.bytes.blockfile.dto.tokens.BlockfileFooterTokens;
import io.datarouter.bytes.blockfile.dto.tokens.BlockfileHeaderTokens;
import io.datarouter.bytes.blockfile.section.BlockfileFooter;
import io.datarouter.bytes.blockfile.section.BlockfileHeader;
import io.datarouter.bytes.blockfile.section.BlockfileTrailer;
import io.datarouter.bytes.blockfile.storage.BlockfileStorage;
import io.datarouter.bytes.codec.intcodec.RawIntCodec;
import io.datarouter.bytes.io.MultiByteArrayInputStream;
import io.datarouter.scanner.ObjectScanner;
import io.datarouter.scanner.PagedList;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

/* loaded from: input_file:io/datarouter/bytes/blockfile/write/BlockfileWriter.class */
public class BlockfileWriter<T> {
    public static final int NUM_SECTION_BYTES = 1;
    private final BlockfileWriterConfig<T> config;
    private final String name;
    private final AtomicLong dataBlockCounter = new AtomicLong();
    private final AtomicInteger footerBlockLength = new AtomicInteger();
    private final AtomicLong fileLengthBytesCounter = new AtomicLong();
    public static final int NUM_VALUE_LENGTH_BYTES = RawIntCodec.INSTANCE.length();
    public static final int NUM_HEADER_METADATA_BYTES = RawIntCodec.INSTANCE.length() + 1;
    public static final int NUM_FOOTER_METADATA_BYTES = RawIntCodec.INSTANCE.length() + 1;
    public static final int NUM_TRAILER_BYTES = RawIntCodec.INSTANCE.length() + RawIntCodec.INSTANCE.length();

    /* loaded from: input_file:io/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult.class */
    public static final class BlockfileWriteResult extends Record {
        private final long numDataBlocks;
        private final ByteLength fileLength;

        public BlockfileWriteResult(long j, ByteLength byteLength) {
            this.numDataBlocks = j;
            this.fileLength = byteLength;
        }

        public long numDataBlocks() {
            return this.numDataBlocks;
        }

        public ByteLength fileLength() {
            return this.fileLength;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlockfileWriteResult.class), BlockfileWriteResult.class, "numDataBlocks;fileLength", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->numDataBlocks:J", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->fileLength:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlockfileWriteResult.class), BlockfileWriteResult.class, "numDataBlocks;fileLength", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->numDataBlocks:J", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->fileLength:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlockfileWriteResult.class, Object.class), BlockfileWriteResult.class, "numDataBlocks;fileLength", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->numDataBlocks:J", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriteResult;->fileLength:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig.class */
    public static final class BlockfileWriterConfig<T> extends Record {
        private final BlockfileStorage storage;
        private final Function<T, byte[]> encoder;
        private final BlockfileHeader.BlockfileHeaderCodec headerCodec;
        private final BlockfileCompressor compressor;
        private final BlockfileChecksummer checksummer;
        private final BinaryDictionary userDictionary;
        private final Supplier<BinaryDictionary> footerUserDictionarySupplier;
        private final List<BlockfileListener> listeners;
        private final int encodeBatchSize;
        private final Threads encodeThreads;
        private final boolean multipartWrite;
        private final Threads writeThreads;

        public BlockfileWriterConfig(BlockfileStorage blockfileStorage, Function<T, byte[]> function, BlockfileHeader.BlockfileHeaderCodec blockfileHeaderCodec, BlockfileCompressor blockfileCompressor, BlockfileChecksummer blockfileChecksummer, BinaryDictionary binaryDictionary, Supplier<BinaryDictionary> supplier, List<BlockfileListener> list, int i, Threads threads, boolean z, Threads threads2) {
            this.storage = blockfileStorage;
            this.encoder = function;
            this.headerCodec = blockfileHeaderCodec;
            this.compressor = blockfileCompressor;
            this.checksummer = blockfileChecksummer;
            this.userDictionary = binaryDictionary;
            this.footerUserDictionarySupplier = supplier;
            this.listeners = list;
            this.encodeBatchSize = i;
            this.encodeThreads = threads;
            this.multipartWrite = z;
            this.writeThreads = threads2;
        }

        public BlockfileStorage storage() {
            return this.storage;
        }

        public Function<T, byte[]> encoder() {
            return this.encoder;
        }

        public BlockfileHeader.BlockfileHeaderCodec headerCodec() {
            return this.headerCodec;
        }

        public BlockfileCompressor compressor() {
            return this.compressor;
        }

        public BlockfileChecksummer checksummer() {
            return this.checksummer;
        }

        public BinaryDictionary userDictionary() {
            return this.userDictionary;
        }

        public Supplier<BinaryDictionary> footerUserDictionarySupplier() {
            return this.footerUserDictionarySupplier;
        }

        public List<BlockfileListener> listeners() {
            return this.listeners;
        }

        public int encodeBatchSize() {
            return this.encodeBatchSize;
        }

        public Threads encodeThreads() {
            return this.encodeThreads;
        }

        public boolean multipartWrite() {
            return this.multipartWrite;
        }

        public Threads writeThreads() {
            return this.writeThreads;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlockfileWriterConfig.class), BlockfileWriterConfig.class, "storage;encoder;headerCodec;compressor;checksummer;userDictionary;footerUserDictionarySupplier;listeners;encodeBatchSize;encodeThreads;multipartWrite;writeThreads", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->storage:Lio/datarouter/bytes/blockfile/storage/BlockfileStorage;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encoder:Ljava/util/function/Function;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->headerCodec:Lio/datarouter/bytes/blockfile/section/BlockfileHeader$BlockfileHeaderCodec;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->compressor:Lio/datarouter/bytes/blockfile/compress/BlockfileCompressor;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->checksummer:Lio/datarouter/bytes/blockfile/checksum/BlockfileChecksummer;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->userDictionary:Lio/datarouter/bytes/BinaryDictionary;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->footerUserDictionarySupplier:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->listeners:Ljava/util/List;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->multipartWrite:Z", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->writeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlockfileWriterConfig.class), BlockfileWriterConfig.class, "storage;encoder;headerCodec;compressor;checksummer;userDictionary;footerUserDictionarySupplier;listeners;encodeBatchSize;encodeThreads;multipartWrite;writeThreads", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->storage:Lio/datarouter/bytes/blockfile/storage/BlockfileStorage;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encoder:Ljava/util/function/Function;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->headerCodec:Lio/datarouter/bytes/blockfile/section/BlockfileHeader$BlockfileHeaderCodec;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->compressor:Lio/datarouter/bytes/blockfile/compress/BlockfileCompressor;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->checksummer:Lio/datarouter/bytes/blockfile/checksum/BlockfileChecksummer;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->userDictionary:Lio/datarouter/bytes/BinaryDictionary;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->footerUserDictionarySupplier:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->listeners:Ljava/util/List;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->multipartWrite:Z", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->writeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlockfileWriterConfig.class, Object.class), BlockfileWriterConfig.class, "storage;encoder;headerCodec;compressor;checksummer;userDictionary;footerUserDictionarySupplier;listeners;encodeBatchSize;encodeThreads;multipartWrite;writeThreads", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->storage:Lio/datarouter/bytes/blockfile/storage/BlockfileStorage;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encoder:Ljava/util/function/Function;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->headerCodec:Lio/datarouter/bytes/blockfile/section/BlockfileHeader$BlockfileHeaderCodec;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->compressor:Lio/datarouter/bytes/blockfile/compress/BlockfileCompressor;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->checksummer:Lio/datarouter/bytes/blockfile/checksum/BlockfileChecksummer;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->userDictionary:Lio/datarouter/bytes/BinaryDictionary;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->footerUserDictionarySupplier:Ljava/util/function/Supplier;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->listeners:Ljava/util/List;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->encodeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->multipartWrite:Z", "FIELD:Lio/datarouter/bytes/blockfile/write/BlockfileWriter$BlockfileWriterConfig;->writeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public BlockfileWriter(BlockfileWriterConfig<T> blockfileWriterConfig, String str) {
        this.config = blockfileWriterConfig;
        this.name = str;
    }

    public BlockfileWriteResult write(Scanner<T> scanner) {
        BlockfileTokens makeHeaderTokens = makeHeaderTokens();
        Scanner each = ObjectScanner.of(makeHeaderTokens).append(makeBlockScanner(scanner)).append(makeFooterScanner()).append(makeTrailerScanner(makeHeaderTokens)).each(blockfileTokens -> {
            this.fileLengthBytesCounter.addAndGet(blockfileTokens.totalLength());
        });
        for (BlockfileListener blockfileListener : this.config.listeners()) {
            blockfileListener.getClass();
            each = each.each(blockfileListener::accept);
        }
        Scanner concatIter = each.concatIter((v0) -> {
            return v0.toList();
        });
        if (this.config.multipartWrite()) {
            this.config.storage().write(this.name, (InputStream) concatIter.apply(MultiByteArrayInputStream::new), this.config.writeThreads());
        } else {
            this.config.storage().write(this.name, ByteTool.concat((List<byte[]>) concatIter.collect(PagedList::new)));
        }
        this.config.listeners().forEach((v0) -> {
            v0.complete();
        });
        return new BlockfileWriteResult(this.dataBlockCounter.get(), ByteLength.ofBytes(this.fileLengthBytesCounter.get()));
    }

    private BlockfileTokens makeHeaderTokens() {
        byte[] encode = this.config.headerCodec().encode(new BlockfileHeader(this.config.userDictionary(), this.config.compressor(), this.config.checksummer().numBytes(), this.config.checksummer()));
        return new BlockfileHeaderTokens(RawIntCodec.INSTANCE.encode(NUM_HEADER_METADATA_BYTES + encode.length), encode);
    }

    private Scanner<BlockfileTokens> makeBlockScanner(Scanner<T> scanner) {
        Scanner concat = scanner.batch(this.config.encodeBatchSize()).parallelOrdered(this.config.encodeThreads()).map(this::encodeBlocks).each(list -> {
            this.dataBlockCounter.addAndGet(list.size());
        }).concat((v0) -> {
            return Scanner.of(v0);
        });
        Class<BlockfileTokens> cls = BlockfileTokens.class;
        BlockfileTokens.class.getClass();
        return concat.map((v1) -> {
            return r1.cast(v1);
        });
    }

    private Scanner<BlockfileTokens> makeFooterScanner() {
        return Scanner.of(this.config.footerUserDictionarySupplier()).map((v0) -> {
            return v0.get();
        }).map(binaryDictionary -> {
            BlockfileTokens encodeFooter = encodeFooter(BlockfileFooter.VALUE_CODEC.encode(new BlockfileFooter(binaryDictionary, this.dataBlockCounter.get())));
            this.footerBlockLength.set(encodeFooter.totalLength());
            return encodeFooter;
        });
    }

    private Scanner<BlockfileTokens> makeTrailerScanner(BlockfileTokens blockfileTokens) {
        return Scanner.of(() -> {
            return new BlockfileTrailer(blockfileTokens.totalLength(), this.footerBlockLength.get());
        }).map((v0) -> {
            return v0.get();
        }).map((v0) -> {
            return v0.encode();
        });
    }

    public int numBlockMetadataBytes() {
        return NUM_VALUE_LENGTH_BYTES + this.config.checksummer().numBytes() + 1;
    }

    public List<BlockfileBlockTokens<T>> encodeBlocks(List<T> list) {
        Codec<byte[], byte[]> codec = this.config.compressor().codecSupplier().get();
        return (List) Scanner.of(list).map(obj -> {
            return encodeBlock(codec, obj);
        }).collect(() -> {
            return new ArrayList(list.size());
        });
    }

    public BlockfileBlockTokens<T> encodeBlock(Codec<byte[], byte[]> codec, T t) {
        byte[] encode = codec.encode(this.config.encoder().apply(t));
        return new BlockfileBlockTokens<>(t, RawIntCodec.INSTANCE.encode(numBlockMetadataBytes() + encode.length), this.config.checksummer().encoder().apply(encode), encode);
    }

    public static BlockfileTokens encodeFooter(byte[] bArr) {
        return new BlockfileFooterTokens(RawIntCodec.INSTANCE.encode(NUM_FOOTER_METADATA_BYTES + bArr.length), bArr);
    }

    public BlockfileWriterConfig<T> config() {
        return this.config;
    }
}
