package io.datakernel.multilog;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.common.MemSize;
import io.datakernel.common.Preconditions;
import io.datakernel.common.Stopwatch;
import io.datakernel.common.parse.TruncatedDataException;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.process.ChannelLZ4Compressor;
import io.datakernel.csp.process.ChannelLZ4Decompressor;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.StreamSupplierWithResult;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.datastream.stats.StreamRegistry;
import io.datakernel.datastream.stats.StreamStats;
import io.datakernel.datastream.stats.StreamStatsDetailed;
import io.datakernel.datastream.stats.StreamStatsSizeCounter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.promise.Promise;
import io.datakernel.promise.SettablePromise;
import io.datakernel.remotefs.FsClient;
import io.datakernel.serializer.BinarySerializer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/multilog/MultilogImpl.class */
public final class MultilogImpl<T> implements Multilog<T>, EventloopJmxMBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(MultilogImpl.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256);
    private final Eventloop eventloop;
    private final FsClient client;
    private final LogNamingScheme namingScheme;
    private final BinarySerializer<T> serializer;
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private Duration autoFlushInterval = null;
    private final StreamRegistry<String> streamReads = StreamRegistry.create();
    private final StreamRegistry<String> streamWrites = StreamRegistry.create();
    private final StreamStatsDetailed<ByteBuf> streamReadStats = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> streamWriteStats = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.datakernel.multilog.MultilogImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/datakernel/multilog/MultilogImpl$1.class */
    public class AnonymousClass1 implements Iterator<StreamSupplier<T>> {
        final Stopwatch sw = Stopwatch.createUnstarted();
        final Iterator<LogPosition> it;
        LogPosition currentPosition;
        long inputStreamPosition;
        final /* synthetic */ List val$logFiles;
        final /* synthetic */ SettablePromise val$positionPromise;
        final /* synthetic */ LogPosition val$startPosition;
        final /* synthetic */ String val$logPartition;

        AnonymousClass1(List list, SettablePromise settablePromise, LogPosition logPosition, String str) {
            this.val$logFiles = list;
            this.val$positionPromise = settablePromise;
            this.val$startPosition = logPosition;
            this.val$logPartition = str;
            this.it = this.val$logFiles.iterator();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.it.hasNext()) {
                return true;
            }
            this.val$positionPromise.set(getLogPosition());
            return false;
        }

        LogPosition getLogPosition() {
            return this.currentPosition == null ? this.val$startPosition : LogPosition.create(this.currentPosition.getLogFile(), this.currentPosition.getPosition() + this.inputStreamPosition);
        }

        @Override // java.util.Iterator
        public StreamSupplier<T> next() {
            this.currentPosition = this.it.next();
            long position = this.currentPosition.getPosition();
            LogFile logFile = this.currentPosition.getLogFile();
            if (MultilogImpl.logger.isTraceEnabled()) {
                MultilogImpl.logger.trace("Read log file `{}` from: {}", logFile, Long.valueOf(position));
            }
            Promise download = MultilogImpl.this.client.download(MultilogImpl.this.namingScheme.path(this.val$logPartition, logFile), position);
            String str = this.val$logPartition;
            return StreamSupplier.ofPromise(download.map(channelSupplier -> {
                this.inputStreamPosition = 0L;
                this.sw.reset().start();
                return ((StreamSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) channelSupplier.transformWith(MultilogImpl.this.streamReads.register(str + ":" + logFile + "@" + position))).transformWith(MultilogImpl.this.streamReadStats)).transformWith(ChannelLZ4Decompressor.create().withInspector(new ChannelLZ4Decompressor.Inspector() { // from class: io.datakernel.multilog.MultilogImpl.1.1
                    /* renamed from: lookup, reason: merged with bridge method [inline-methods] */
                    public <Q extends ChannelLZ4Decompressor.Inspector> Q m2lookup(Class<Q> cls) {
                        throw new UnsupportedOperationException();
                    }

                    public void onBlock(ChannelLZ4Decompressor channelLZ4Decompressor, ChannelLZ4Decompressor.Header header, ByteBuf byteBuf, ByteBuf byteBuf2) {
                        AnonymousClass1.this.inputStreamPosition += ChannelLZ4Decompressor.HEADER_LENGTH + header.compressedLen;
                    }
                }))).transformWith(channelSupplier -> {
                    return channelSupplier.withEndOfStream(promise -> {
                        return promise.thenEx((r2, th) -> {
                            return (th == null || (th instanceof TruncatedDataException)) ? Promise.complete() : Promise.ofException(th);
                        });
                    });
                })).transformWith(ChannelDeserializer.create(MultilogImpl.this.serializer))).withEndOfStream(promise -> {
                    return promise.whenComplete((r4, th) -> {
                        log(th);
                    });
                }).withLateBinding();
            }));
        }

        private void log(Throwable th) {
            if (th == null && MultilogImpl.logger.isTraceEnabled()) {
                MultilogImpl.logger.trace("Finish log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{MultilogImpl.this.client, MultilogImpl.this.namingScheme.path(this.val$logPartition, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.inputStreamPosition), Long.valueOf(this.inputStreamPosition / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L))});
            } else {
                if (th == null || !MultilogImpl.logger.isErrorEnabled()) {
                    return;
                }
                MultilogImpl.logger.error("Error on log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{MultilogImpl.this.client, MultilogImpl.this.namingScheme.path(this.val$logPartition, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.inputStreamPosition), Long.valueOf(this.inputStreamPosition / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L)), th});
            }
        }
    }

    private MultilogImpl(Eventloop eventloop, FsClient fsClient, BinarySerializer<T> binarySerializer, LogNamingScheme logNamingScheme) {
        this.eventloop = eventloop;
        this.client = fsClient;
        this.serializer = binarySerializer;
        this.namingScheme = logNamingScheme;
    }

    public static <T> MultilogImpl<T> create(Eventloop eventloop, FsClient fsClient, BinarySerializer<T> binarySerializer, LogNamingScheme logNamingScheme) {
        return new MultilogImpl<>(eventloop, fsClient, binarySerializer, logNamingScheme);
    }

    public MultilogImpl<T> withBufferSize(int i) {
        this.bufferSize = MemSize.of(i);
        return this;
    }

    public MultilogImpl<T> withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    public MultilogImpl<T> withAutoFlushInterval(Duration duration) {
        this.autoFlushInterval = duration;
        return this;
    }

    @Override // io.datakernel.multilog.Multilog
    public Promise<StreamConsumer<T>> write(@NotNull String str) {
        validateLogPartition(str);
        return Promise.of(StreamConsumer.ofSupplier(streamSupplier -> {
            return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) streamSupplier.transformWith(ChannelSerializer.create(this.serializer).withAutoFlushInterval(this.autoFlushInterval).withInitialBufferSize(this.bufferSize).withSkipSerializationErrors())).transformWith(ChannelLZ4Compressor.createFastCompressor())).transformWith(this.streamWrites.register(str))).transformWith(this.streamWriteStats)).bindTo(new LogStreamChunker(this.eventloop, this.client, this.namingScheme, str));
        }).withLateBinding());
    }

    @Override // io.datakernel.multilog.Multilog
    public Promise<StreamSupplierWithResult<T, LogPosition>> read(@NotNull String str, @NotNull LogFile logFile, long j, @Nullable LogFile logFile2) {
        validateLogPartition(str);
        LogPosition create = LogPosition.create(logFile, j);
        return this.client.list(this.namingScheme.getListGlob(str)).map(list -> {
            Stream map = list.stream().map((v0) -> {
                return v0.getName();
            });
            LogNamingScheme logNamingScheme = this.namingScheme;
            logNamingScheme.getClass();
            return (List) map.map(logNamingScheme::parse).filter((v0) -> {
                return Objects.nonNull(v0);
            }).filter(partitionAndFile -> {
                return partitionAndFile.getLogPartition().equals(str);
            }).map((v0) -> {
                return v0.getLogFile();
            }).collect(Collectors.toList());
        }).map(list2 -> {
            return (List) list2.stream().filter(logFile3 -> {
                return isFileInRange(logFile3, create, logFile2);
            }).map(logFile4 -> {
                return logFile4.equals(create.getLogFile()) ? create : LogPosition.create(logFile4, 0L);
            }).sorted().collect(Collectors.toList());
        }).map(list3 -> {
            return readLogFiles(str, create, list3);
        });
    }

    private StreamSupplierWithResult<T, LogPosition> readLogFiles(@NotNull String str, @NotNull LogPosition logPosition, @NotNull List<LogPosition> list) {
        SettablePromise settablePromise = new SettablePromise();
        return StreamSupplierWithResult.of(StreamSupplier.concat(new AnonymousClass1(list, settablePromise, logPosition, str)).withLateBinding(), settablePromise);
    }

    private static void validateLogPartition(@NotNull String str) {
        Preconditions.checkArgument(!str.contains("-"), "Using dash (-) in log partition name is not allowed");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isFileInRange(@NotNull LogFile logFile, @NotNull LogPosition logPosition, @Nullable LogFile logFile2) {
        if (logFile.compareTo(logPosition.getLogFile()) < 0) {
            return false;
        }
        return logFile2 == null || logFile.compareTo(logFile2) <= 0;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }
}
