package io.datakernel.stream.processor;

import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.file.AsyncFile;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerModifier;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.file.StreamFileReader;
import io.datakernel.stream.file.StreamFileWriter;
import io.datakernel.util.MemSize;
import io.datakernel.util.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSorterStorageImpl.class */
public final class StreamSorterStorageImpl<T> implements StreamSorterStorage<T> {
    public static final String DEFAULT_FILE_PATTERN = "%d";
    public static final MemSize DEFAULT_SORTER_BLOCK_SIZE = MemSize.kilobytes(256);
    private static final AtomicInteger PARTITION = new AtomicInteger();
    private final ExecutorService executorService;
    private final BufferSerializer<T> serializer;
    private final Path path;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private String filePattern = DEFAULT_FILE_PATTERN;
    private MemSize readBlockSize = DEFAULT_SORTER_BLOCK_SIZE;
    private MemSize writeBlockSize = DEFAULT_SORTER_BLOCK_SIZE;
    private int compressionLevel = 0;

    private StreamSorterStorageImpl(ExecutorService executorService, BufferSerializer<T> bufferSerializer, Path path) {
        this.executorService = executorService;
        this.serializer = bufferSerializer;
        this.path = path;
    }

    public static <T> StreamSorterStorageImpl<T> create(ExecutorService executorService, BufferSerializer<T> bufferSerializer, Path path) {
        Preconditions.checkArgument(!path.getFileName().toString().contains(DEFAULT_FILE_PATTERN));
        try {
            Files.createDirectories(path, new FileAttribute[0]);
            return new StreamSorterStorageImpl<>(executorService, bufferSerializer, path);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public StreamSorterStorageImpl<T> withFilePattern(String str) {
        Preconditions.checkArgument(!str.contains(DEFAULT_FILE_PATTERN));
        this.filePattern = str;
        return this;
    }

    public StreamSorterStorageImpl<T> withReadBlockSize(MemSize memSize) {
        this.readBlockSize = memSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withWriteBlockSize(MemSize memSize) {
        this.writeBlockSize = memSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withCompressionLevel(int i) {
        this.compressionLevel = i;
        return this;
    }

    private Path partitionPath(int i) {
        return this.path.resolve(String.format(this.filePattern, Integer.valueOf(i)));
    }

    @Override // io.datakernel.stream.processor.StreamSorterStorage
    public Stage<StreamConsumerWithResult<T, Integer>> write() {
        int incrementAndGet = PARTITION.incrementAndGet();
        return AsyncFile.openAsync(this.executorService, partitionPath(incrementAndGet), StreamFileWriter.CREATE_OPTIONS).thenApply(asyncFile -> {
            return StreamTransformer.idenity().with((StreamProducerModifier) StreamBinarySerializer.create(this.serializer)).with((StreamProducerModifier) StreamByteChunker.create(this.writeBlockSize.map(l -> {
                return Long.valueOf(l.longValue() / 2);
            }), this.writeBlockSize)).with((StreamProducerModifier) StreamLZ4Compressor.create(this.compressionLevel)).with((StreamProducerModifier) StreamByteChunker.create(this.writeBlockSize.map(l2 -> {
                return Long.valueOf(l2.longValue() / 2);
            }), this.writeBlockSize)).applyTo((StreamConsumerWithResult) StreamFileWriter.create(asyncFile).withFlushAsResult()).thenApply(r3 -> {
                return Integer.valueOf(incrementAndGet);
            }).withLateBinding();
        });
    }

    @Override // io.datakernel.stream.processor.StreamSorterStorage
    public Stage<StreamProducerWithResult<T, Void>> read(int i) {
        return AsyncFile.openAsync(this.executorService, partitionPath(i), new OpenOption[]{StandardOpenOption.READ}).thenApply(asyncFile -> {
            return StreamFileReader.readFile(asyncFile).withBufferSize(this.readBlockSize).with(StreamLZ4Decompressor.create()).with(StreamBinaryDeserializer.create(this.serializer)).withEndOfStreamAsResult().withLateBinding();
        });
    }

    @Override // io.datakernel.stream.processor.StreamSorterStorage
    public Stage<Void> cleanup(List<Integer> list) {
        return Stage.ofCallable(this.executorService, () -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Path partitionPath = partitionPath(((Integer) it.next()).intValue());
                try {
                    Files.delete(partitionPath);
                } catch (IOException e) {
                    this.logger.warn("Could not delete {} : {}", partitionPath, e.toString());
                }
            }
            return null;
        });
    }
}
