package io.datarouter.storage.file;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.io.MultiByteArrayInputStream;
import io.datarouter.bytes.split.ChunkScannerTool;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import io.datarouter.storage.node.op.raw.read.BlobStorageReader;
import io.datarouter.util.concurrent.BlockingQueueTool;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher.class */
public class BlobPrefetcher {
    private static final Logger logger = LoggerFactory.getLogger(BlobPrefetcher.class);
    private final Scanner<BlobPrefetchRequest> requests;
    private final Threads scanChunksThreads;
    private final ByteLength chunkSize;
    private final int bufferSizeKiB;
    private final ExecutorService prefetchExec;
    private final Semaphore semaphore;
    private final BlockingQueue<PrefetchMessage> buffer;
    private Future<Void> prefetchFuture;

    /* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest.class */
    public static final class BlobChunkRequest extends Record {
        private final BlobStorageReader reader;
        private final PathbeanKey key;
        private final long offset;
        private final int length;

        public BlobChunkRequest(BlobStorageReader blobStorageReader, PathbeanKey pathbeanKey, long j, int i) {
            this.reader = blobStorageReader;
            this.key = pathbeanKey;
            this.offset = j;
            this.length = i;
        }

        public BlobStorageReader reader() {
            return this.reader;
        }

        public PathbeanKey key() {
            return this.key;
        }

        public long offset() {
            return this.offset;
        }

        public int length() {
            return this.length;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlobChunkRequest.class), BlobChunkRequest.class, "reader;key;offset;length", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->length:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlobChunkRequest.class), BlobChunkRequest.class, "reader;key;offset;length", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->length:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlobChunkRequest.class, Object.class), BlobChunkRequest.class, "reader;key;offset;length", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkRequest;->length:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse.class */
    public static final class BlobChunkResponse extends Record {
        private final BlobStorageReader reader;
        private final PathbeanKey key;
        private final long offset;
        private final byte[] bytes;

        public BlobChunkResponse(BlobStorageReader blobStorageReader, PathbeanKey pathbeanKey, long j, byte[] bArr) {
            this.reader = blobStorageReader;
            this.key = pathbeanKey;
            this.offset = j;
            this.bytes = bArr;
        }

        public BlobStorageReader reader() {
            return this.reader;
        }

        public PathbeanKey key() {
            return this.key;
        }

        public long offset() {
            return this.offset;
        }

        public byte[] bytes() {
            return this.bytes;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlobChunkResponse.class), BlobChunkResponse.class, "reader;key;offset;bytes", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->bytes:[B").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlobChunkResponse.class), BlobChunkResponse.class, "reader;key;offset;bytes", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->bytes:[B").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlobChunkResponse.class, Object.class), BlobChunkResponse.class, "reader;key;offset;bytes", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->offset:J", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;->bytes:[B").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest.class */
    public static final class BlobPrefetchRequest extends Record {
        private final BlobStorageReader reader;
        private final PathbeanKey key;
        private final long fileLength;

        public BlobPrefetchRequest(BlobStorageReader blobStorageReader, PathbeanKey pathbeanKey, long j) {
            this.reader = blobStorageReader;
            this.key = pathbeanKey;
            this.fileLength = j;
        }

        public BlobStorageReader reader() {
            return this.reader;
        }

        public PathbeanKey key() {
            return this.key;
        }

        public long fileLength() {
            return this.fileLength;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BlobPrefetchRequest.class), BlobPrefetchRequest.class, "reader;key;fileLength", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->fileLength:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BlobPrefetchRequest.class), BlobPrefetchRequest.class, "reader;key;fileLength", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->fileLength:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BlobPrefetchRequest.class, Object.class), BlobPrefetchRequest.class, "reader;key;fileLength", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->reader:Lio/datarouter/storage/node/op/raw/read/BlobStorageReader;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$BlobPrefetchRequest;->fileLength:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream.class */
    public static final class PathbeanKeyAndInputStream extends Record {
        private final PathbeanKey key;
        private final InputStream inputStream;

        public PathbeanKeyAndInputStream(PathbeanKey pathbeanKey, InputStream inputStream) {
            this.key = pathbeanKey;
            this.inputStream = inputStream;
        }

        public PathbeanKey key() {
            return this.key;
        }

        public InputStream inputStream() {
            return this.inputStream;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PathbeanKeyAndInputStream.class), PathbeanKeyAndInputStream.class, "key;inputStream", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->inputStream:Ljava/io/InputStream;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PathbeanKeyAndInputStream.class), PathbeanKeyAndInputStream.class, "key;inputStream", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->inputStream:Ljava/io/InputStream;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PathbeanKeyAndInputStream.class, Object.class), PathbeanKeyAndInputStream.class, "key;inputStream", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->key:Lio/datarouter/storage/file/PathbeanKey;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PathbeanKeyAndInputStream;->inputStream:Ljava/io/InputStream;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/storage/file/BlobPrefetcher$PrefetchMessage.class */
    private static final class PrefetchMessage extends Record {
        private final boolean isPresent;
        private final BlobChunkResponse value;
        private final boolean isError;
        private final RuntimeException error;

        private PrefetchMessage(boolean z, BlobChunkResponse blobChunkResponse, boolean z2, RuntimeException runtimeException) {
            this.isPresent = z;
            this.value = blobChunkResponse;
            this.isError = z2;
            this.error = runtimeException;
        }

        private static PrefetchMessage present(BlobChunkResponse blobChunkResponse) {
            return new PrefetchMessage(true, blobChunkResponse, false, null);
        }

        private static PrefetchMessage absent() {
            return new PrefetchMessage(false, null, false, null);
        }

        private static PrefetchMessage error(RuntimeException runtimeException) {
            return new PrefetchMessage(false, null, true, runtimeException);
        }

        public boolean isPresent() {
            return this.isPresent;
        }

        public BlobChunkResponse value() {
            return this.value;
        }

        public boolean isError() {
            return this.isError;
        }

        public RuntimeException error() {
            return this.error;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PrefetchMessage.class), PrefetchMessage.class, "isPresent;value;isError;error", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isPresent:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->value:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isError:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->error:Ljava/lang/RuntimeException;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PrefetchMessage.class), PrefetchMessage.class, "isPresent;value;isError;error", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isPresent:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->value:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isError:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->error:Ljava/lang/RuntimeException;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PrefetchMessage.class, Object.class), PrefetchMessage.class, "isPresent;value;isError;error", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isPresent:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->value:Lio/datarouter/storage/file/BlobPrefetcher$BlobChunkResponse;", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->isError:Z", "FIELD:Lio/datarouter/storage/file/BlobPrefetcher$PrefetchMessage;->error:Ljava/lang/RuntimeException;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public BlobPrefetcher(Scanner<BlobPrefetchRequest> scanner, Threads threads, ByteLength byteLength, ByteLength byteLength2, ExecutorService executorService) {
        if (byteLength2.toBytes() < byteLength.toBytes()) {
            throw new IllegalArgumentException(String.format("bufferSize=%s must be >= chunkSize=%s", byteLength2, byteLength));
        }
        this.requests = scanner;
        this.scanChunksThreads = threads;
        this.chunkSize = byteLength;
        this.prefetchExec = executorService;
        this.bufferSizeKiB = (int) byteLength2.toKiB();
        this.semaphore = new Semaphore(this.bufferSizeKiB);
        this.buffer = new LinkedBlockingQueue(Integer.MAX_VALUE);
    }

    private Void prefetch() {
        try {
            this.requests.concat(blobPrefetchRequest -> {
                return ChunkScannerTool.scanChunks(0L, blobPrefetchRequest.fileLength(), this.chunkSize.toBytesInt()).map(chunkRange -> {
                    return new BlobChunkRequest(blobPrefetchRequest.reader(), blobPrefetchRequest.key(), chunkRange.start, chunkRange.length);
                });
            }).parallelOrdered(this.scanChunksThreads).map(blobChunkRequest -> {
                return new BlobChunkResponse(blobChunkRequest.reader(), blobChunkRequest.key(), blobChunkRequest.offset(), blobChunkRequest.reader().readPartial(blobChunkRequest.key(), blobChunkRequest.offset(), blobChunkRequest.length()).orElseThrow());
            }).forEach(this::addToBuffer);
        } catch (RuntimeException e) {
            BlockingQueueTool.put(this.buffer, PrefetchMessage.error(e));
        }
        BlockingQueueTool.put(this.buffer, PrefetchMessage.absent());
        return null;
    }

    private Scanner<BlobChunkResponse> scanChunksFromBuffer() {
        this.prefetchFuture = this.prefetchExec.submit(this::prefetch);
        return Scanner.generate(this::takeNextFromBuffer).advanceWhile((v0) -> {
            return v0.isPresent();
        }).concatOpt(Function.identity());
    }

    public Scanner<PathbeanKeyAndInputStream> scanInputStreams() {
        return scanChunksFromBuffer().splitByWithSplitKey((v0) -> {
            return v0.key();
        }).map(splitKeyAndScanner -> {
            return new PathbeanKeyAndInputStream((PathbeanKey) splitKeyAndScanner.splitKey(), (InputStream) splitKeyAndScanner.scanner().map((v0) -> {
                return v0.bytes();
            }).apply(MultiByteArrayInputStream::new));
        });
    }

    private static int calcKiB(BlobChunkResponse blobChunkResponse) {
        int length = blobChunkResponse.bytes().length;
        int i = length / 1024;
        return length % 1024 > 0 ? i + 1 : i;
    }

    private void addToBuffer(BlobChunkResponse blobChunkResponse) {
        int calcKiB = calcKiB(blobChunkResponse);
        if (calcKiB > this.bufferSizeKiB) {
            throw new RuntimeException(String.format("Item KiB=%s is greater than maxKiB=%s", Integer.valueOf(calcKiB), Integer.valueOf(this.bufferSizeKiB)));
        }
        try {
            this.semaphore.acquire(calcKiB);
            this.buffer.add(PrefetchMessage.present(blobChunkResponse));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<BlobChunkResponse> takeNextFromBuffer() {
        PrefetchMessage prefetchMessage = (PrefetchMessage) BlockingQueueTool.take(this.buffer);
        if (prefetchMessage.isPresent()) {
            this.semaphore.release(calcKiB(prefetchMessage.value()));
            return Optional.of(prefetchMessage.value());
        }
        if (!prefetchMessage.isError()) {
            return Optional.empty();
        }
        if (this.prefetchFuture != null) {
            try {
                this.prefetchFuture.cancel(true);
            } catch (RuntimeException e) {
                logger.warn("Exception canceling prefetch Future", e);
            }
        }
        throw prefetchMessage.error();
    }
}
