package io.datakernel.logfs;

import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.file.AsyncFile;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.StageStats;
import io.datakernel.logfs.AbstractLogFileSystem;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.file.StreamFileReader;
import io.datakernel.stream.file.StreamFileWriter;
import io.datakernel.stream.stats.StreamRegistry;
import io.datakernel.stream.stats.StreamStats;
import io.datakernel.stream.stats.StreamStatsDetailed;
import io.datakernel.stream.stats.StreamStatsSizeCounter;
import io.datakernel.util.MemSize;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/logfs/LocalFsLogFileSystem.class */
public final class LocalFsLogFileSystem extends AbstractLogFileSystem implements EventloopJmxMBeanEx {
    public static final MemSize DEFAULT_READ_BLOCK_SIZE = MemSize.kilobytes(256);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final Eventloop eventloop;
    private final ExecutorService executorService;
    private final Path dir;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private MemSize readBlockSize = DEFAULT_READ_BLOCK_SIZE;
    private final StageStats stageList = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageRead = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageWrite = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StreamRegistry<String> streamReads = StreamRegistry.create();
    private final StreamRegistry<String> streamWrites = StreamRegistry.create();
    private final StreamStatsDetailed<ByteBuf> streamReadStats = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> streamWriteStats = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());

    private LocalFsLogFileSystem(Eventloop eventloop, ExecutorService executorService, Path path) {
        this.eventloop = eventloop;
        this.executorService = executorService;
        this.dir = path;
    }

    public static LocalFsLogFileSystem create(Eventloop eventloop, ExecutorService executorService, Path path) {
        return new LocalFsLogFileSystem(eventloop, executorService, path);
    }

    public static LocalFsLogFileSystem create(Eventloop eventloop, ExecutorService executorService, Path path, String str) {
        return create(eventloop, executorService, path.resolve(str));
    }

    public LocalFsLogFileSystem withReadBlockSize(MemSize memSize) {
        this.readBlockSize = memSize;
        return this;
    }

    private Path path(String str, LogFile logFile) {
        return this.dir.resolve(fileName(str, logFile));
    }

    @Override // io.datakernel.logfs.LogFileSystem
    public Stage<List<LogFile>> list(String str) {
        Eventloop.getCurrentEventloop();
        return Stage.ofCallable(this.executorService, () -> {
            final ArrayList arrayList = new ArrayList();
            Files.createDirectories(this.dir, new FileAttribute[0]);
            Files.walkFileTree(this.dir, new FileVisitor<Path>() { // from class: io.datakernel.logfs.LocalFsLogFileSystem.1
                @Override // java.nio.file.FileVisitor
                public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                    return FileVisitResult.CONTINUE;
                }

                @Override // java.nio.file.FileVisitor
                public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                    AbstractLogFileSystem.PartitionAndFile parse = AbstractLogFileSystem.parse(path.getFileName().toString());
                    if (parse != null && parse.logPartition.equals(str)) {
                        arrayList.add(parse.logFile);
                    }
                    return FileVisitResult.CONTINUE;
                }

                @Override // java.nio.file.FileVisitor
                public FileVisitResult visitFileFailed(Path path, IOException iOException) throws IOException {
                    if (iOException != null) {
                        LocalFsLogFileSystem.this.logger.error("visitFileFailed error", iOException);
                    }
                    return FileVisitResult.CONTINUE;
                }

                @Override // java.nio.file.FileVisitor
                public FileVisitResult postVisitDirectory(Path path, IOException iOException) throws IOException {
                    if (iOException != null) {
                        LocalFsLogFileSystem.this.logger.error("postVisitDirectory error", iOException);
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
            return arrayList;
        }).whenComplete(this.stageList.recordStats());
    }

    @Override // io.datakernel.logfs.LogFileSystem
    public Stage<StreamProducerWithResult<ByteBuf, Void>> read(String str, LogFile logFile, long j) {
        return AsyncFile.openAsync(this.executorService, path(str, logFile), new OpenOption[]{StandardOpenOption.READ}).whenComplete(this.stageRead.recordStats()).thenApply(asyncFile -> {
            return StreamFileReader.readFile(asyncFile).withBufferSize(this.readBlockSize).withStartingPosition(j).with(this.streamReads.newEntry(str + ":" + logFile + "@" + j)).with(this.streamReadStats).withEndOfStreamAsResult().withLateBinding();
        });
    }

    @Override // io.datakernel.logfs.LogFileSystem
    public Stage<StreamConsumerWithResult<ByteBuf, Void>> write(String str, LogFile logFile) {
        return AsyncFile.openAsync(this.executorService, path(str, logFile), StreamFileWriter.CREATE_OPTIONS).whenComplete(this.stageWrite.recordStats()).thenApply(asyncFile -> {
            return StreamFileWriter.create(asyncFile).withForceOnClose(true).withFlushAsResult().with(this.streamWrites.newEntry(str + ":" + logFile)).with(this.streamWriteStats).withLateBinding();
        });
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public String getDir() {
        return this.dir.toString();
    }

    @JmxAttribute
    public MemSize getReadBlockSize() {
        return this.readBlockSize;
    }

    @JmxAttribute
    public void setReadBlockSize(MemSize memSize) {
        this.readBlockSize = memSize;
    }

    @JmxAttribute
    public StageStats getStageList() {
        return this.stageList;
    }

    @JmxAttribute
    public StageStats getStageRead() {
        return this.stageRead;
    }

    @JmxAttribute
    public StageStats getStageWrite() {
        return this.stageWrite;
    }

    @JmxAttribute
    public StreamRegistry getStreamReads() {
        return this.streamReads;
    }

    @JmxAttribute
    public StreamRegistry getStreamWrites() {
        return this.streamWrites;
    }

    @JmxAttribute
    public StreamStatsDetailed getStreamReadStats() {
        return this.streamReadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getStreamWriteStats() {
        return this.streamWriteStats;
    }
}
