package io.activej.launchers.dataflow;

import io.activej.config.Config;
import io.activej.config.converter.ConfigConverters;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.process.frame.FrameFormats;
import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.DataflowServer;
import io.activej.dataflow.graph.StreamSchema;
import io.activej.dataflow.graph.Task;
import io.activej.dataflow.inject.BinarySerializerModule;
import io.activej.dataflow.inject.DataflowModule;
import io.activej.dataflow.inject.DatasetIdModule;
import io.activej.dataflow.inject.SortingExecutor;
import io.activej.dataflow.messaging.DataflowRequest;
import io.activej.dataflow.messaging.DataflowResponse;
import io.activej.dataflow.node.StreamSorterStorageFactory;
import io.activej.datastream.processor.transformer.sort.IStreamSorterStorage;
import io.activej.datastream.processor.transformer.sort.StreamSorterStorage;
import io.activej.inject.Injector;
import io.activej.inject.annotation.Eager;
import io.activej.inject.annotation.Named;
import io.activej.inject.annotation.Provides;
import io.activej.inject.module.AbstractModule;
import io.activej.launchers.initializers.Initializers;
import io.activej.promise.Promise;
import io.activej.reactor.Reactor;
import io.activej.reactor.nio.NioReactor;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/launchers/dataflow/DataflowServerModule.class */
public final class DataflowServerModule extends AbstractModule {
    private static final Logger logger = LoggerFactory.getLogger(DataflowServerModule.class);

    private DataflowServerModule() {
    }

    public static DataflowServerModule create() {
        return new DataflowServerModule();
    }

    protected void configure() {
        install(DataflowModule.create());
        install(DatasetIdModule.create());
    }

    @Provides
    Executor executor(Config config) {
        return ConfigConverters.getExecutor(config);
    }

    @Provides
    @Eager
    @SortingExecutor
    Executor sortingExecutor(Config config) {
        return ConfigConverters.getExecutor(config.getChild("sortingExecutor"));
    }

    @Provides
    DataflowServer server(@Named("Dataflow") NioReactor nioReactor, Config config, ByteBufsCodec<DataflowRequest, DataflowResponse> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator, Injector injector) {
        return (DataflowServer) DataflowServer.builder(nioReactor, byteBufsCodec, binarySerializerLocator, injector).initialize(Initializers.ofAbstractServer(config.getChild("dataflow.server"))).build();
    }

    @Provides
    @Eager
    DataflowClient client(@Named("Dataflow") NioReactor nioReactor, ByteBufsCodec<DataflowResponse, DataflowRequest> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator) {
        return DataflowClient.create(nioReactor, byteBufsCodec, binarySerializerLocator);
    }

    @Provides
    @Eager
    StreamSorterStorageFactory storageFactory(@Named("Dataflow") final Reactor reactor, final Executor executor, final BinarySerializerModule.BinarySerializerLocator binarySerializerLocator, Config config) throws IOException {
        Path path = (Path) config.get(ConfigConverters.ofPath(), "dataflow.sortDir", (Object) null);
        final Path createTempDirectory = path == null ? Files.createTempDirectory("dataflow-sort-dir", new FileAttribute[0]) : path;
        return new StreamSorterStorageFactory() { // from class: io.activej.launchers.dataflow.DataflowServerModule.1
            int index;
            static final /* synthetic */ boolean $assertionsDisabled;

            public <T> IStreamSorterStorage<T> create(StreamSchema<T> streamSchema, Task task, Promise<Void> promise) {
                Path path2 = createTempDirectory;
                long taskId = task.getTaskId();
                this.index++;
                return StreamSorterStorage.create(reactor, executor, streamSchema.createSerializer(binarySerializerLocator), FrameFormats.lz4(), path2.resolve(taskId + "_" + path2));
            }

            public <T> Promise<Void> cleanup(IStreamSorterStorage<T> iStreamSorterStorage) {
                if (!$assertionsDisabled && !(iStreamSorterStorage instanceof StreamSorterStorage)) {
                    throw new AssertionError();
                }
                StreamSorterStorage streamSorterStorage = (StreamSorterStorage) iStreamSorterStorage;
                return Promise.ofBlocking(executor, () -> {
                    try {
                        Files.deleteIfExists(streamSorterStorage.getPath());
                    } catch (IOException e) {
                        DataflowServerModule.logger.warn("Could not delete sort storage directory: {}", streamSorterStorage.getPath(), e);
                    }
                });
            }

            static {
                $assertionsDisabled = !DataflowServerModule.class.desiredAssertionStatus();
            }
        };
    }
}
