package io.activej.dataflow;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.MemSize;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.csp.queue.ChannelQueue;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandDownload;
import io.activej.dataflow.command.DataflowCommandExecute;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.di.BinarySerializerModule;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.TaskContext;
import io.activej.dataflow.node.Node;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.di.ResourceLocator;
import io.activej.eventloop.Eventloop;
import io.activej.net.AbstractServer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/dataflow/DataflowServer.class */
public final class DataflowServer extends AbstractServer<DataflowServer> {
    private final Map<StreamId, ChannelQueue<ByteBuf>> pendingStreams;
    private final Map<Class, CommandHandler> handlers;
    private final ByteBufsCodec<DataflowCommand, DataflowResponse> codec;
    private final BinarySerializerModule.BinarySerializerLocator serializers;
    private final ResourceLocator environment;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/activej/dataflow/DataflowServer$CommandHandler.class */
    public interface CommandHandler<I, O> {
        void onCommand(Messaging<I, O> messaging, I i);
    }

    /* loaded from: input_file:io/activej/dataflow/DataflowServer$DownloadCommandHandler.class */
    private class DownloadCommandHandler implements CommandHandler<DataflowCommandDownload, DataflowResponse> {
        private DownloadCommandHandler() {
        }

        @Override // io.activej.dataflow.DataflowServer.CommandHandler
        public void onCommand(Messaging<DataflowCommandDownload, DataflowResponse> messaging, DataflowCommandDownload dataflowCommandDownload) {
            if (DataflowServer.this.logger.isTraceEnabled()) {
                DataflowServer.this.logger.trace("Processing onDownload: {}, {}", dataflowCommandDownload, messaging);
            }
            StreamId streamId = dataflowCommandDownload.getStreamId();
            ChannelQueue channelQueue = (ChannelQueue) DataflowServer.this.pendingStreams.remove(streamId);
            if (channelQueue != null) {
                DataflowServer.this.logger.info("onDownload: transferring {}, pending downloads: {}", streamId, Integer.valueOf(DataflowServer.this.pendingStreams.size()));
            } else {
                channelQueue = new ChannelZeroBuffer();
                DataflowServer.this.pendingStreams.put(streamId, channelQueue);
                DataflowServer.this.logger.info("onDownload: waiting {}, pending downloads: {}", streamId, Integer.valueOf(DataflowServer.this.pendingStreams.size()));
                messaging.receive().whenException(() -> {
                    if (((ChannelQueue) DataflowServer.this.pendingStreams.remove(streamId)) != null) {
                        DataflowServer.this.logger.info("onDownload: removing {}, pending downloads: {}", streamId, Integer.valueOf(DataflowServer.this.pendingStreams.size()));
                    }
                });
            }
            ChannelConsumer sendBinaryStream = messaging.sendBinaryStream();
            channelQueue.getSupplier().streamTo(sendBinaryStream);
            sendBinaryStream.withAcknowledgement(promise -> {
                return promise.whenComplete((r5, th) -> {
                    if (th != null) {
                        DataflowServer.this.logger.warn("Exception occurred while trying to send data");
                    }
                    messaging.close();
                });
            });
        }
    }

    /* loaded from: input_file:io/activej/dataflow/DataflowServer$ExecuteCommandHandler.class */
    private class ExecuteCommandHandler implements CommandHandler<DataflowCommandExecute, DataflowResponse> {
        private ExecuteCommandHandler() {
        }

        @Override // io.activej.dataflow.DataflowServer.CommandHandler
        public void onCommand(Messaging<DataflowCommandExecute, DataflowResponse> messaging, DataflowCommandExecute dataflowCommandExecute) {
            TaskContext taskContext = new TaskContext(DataflowServer.this.environment);
            try {
                Iterator<Node> it = dataflowCommandExecute.getNodes().iterator();
                while (it.hasNext()) {
                    it.next().createAndBind(taskContext);
                }
                taskContext.execute().whenComplete((r8, th) -> {
                    if (th == null) {
                        DataflowServer.this.logger.info("Task executed successfully: {}", dataflowCommandExecute);
                    } else {
                        DataflowServer.this.logger.error("Failed to execute task: {}", dataflowCommandExecute, th);
                    }
                    sendResponse(messaging, th);
                });
                messaging.receive().whenException(() -> {
                    if (taskContext.isExecuted()) {
                        return;
                    }
                    DataflowServer.this.logger.error("Client disconnected. Canceling task: {}", dataflowCommandExecute);
                    taskContext.cancel();
                });
            } catch (Exception e) {
                DataflowServer.this.logger.error("Failed to createAndBind task: {}", dataflowCommandExecute, e);
                sendResponse(messaging, e);
            }
        }

        private void sendResponse(Messaging<DataflowCommandExecute, DataflowResponse> messaging, @Nullable Throwable th) {
            String str = null;
            if (th != null) {
                str = th.getClass().getSimpleName() + ": " + th.getMessage();
            }
            Promise send = messaging.send(new DataflowResponse(str));
            Objects.requireNonNull(messaging);
            send.whenComplete(messaging::close);
        }
    }

    public DataflowServer(Eventloop eventloop, ByteBufsCodec<DataflowCommand, DataflowResponse> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator, ResourceLocator resourceLocator) {
        super(eventloop);
        this.pendingStreams = new HashMap();
        this.handlers = new HashMap();
        this.handlers.put(DataflowCommandDownload.class, new DownloadCommandHandler());
        this.handlers.put(DataflowCommandExecute.class, new ExecuteCommandHandler());
        this.codec = byteBufsCodec;
        this.serializers = binarySerializerLocator;
        this.environment = resourceLocator;
    }

    public <T> StreamConsumer<T> upload(StreamId streamId, Class<T> cls) {
        ChannelSerializer withExplicitEndOfStream = ChannelSerializer.create(this.serializers.get(cls)).withInitialBufferSize(MemSize.kilobytes(256L)).withAutoFlushInterval(Duration.ZERO).withExplicitEndOfStream();
        ChannelQueue<ByteBuf> remove = this.pendingStreams.remove(streamId);
        if (remove == null) {
            remove = new ChannelZeroBuffer<>();
            this.pendingStreams.put(streamId, remove);
            this.logger.info("onUpload: waiting {}, pending downloads: {}", streamId, Integer.valueOf(this.pendingStreams.size()));
        } else {
            this.logger.info("onUpload: transferring {}, pending downloads: {}", streamId, Integer.valueOf(this.pendingStreams.size()));
        }
        withExplicitEndOfStream.getOutput().set(remove.getConsumer());
        withExplicitEndOfStream.getAcknowledgement().whenException(() -> {
            ChannelQueue<ByteBuf> remove2 = this.pendingStreams.remove(streamId);
            if (remove2 != null) {
                this.logger.info("onUpload: removing {}, pending downloads: {}", streamId, Integer.valueOf(this.pendingStreams.size()));
                remove2.close();
            }
        });
        return withExplicitEndOfStream;
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocket, this.codec);
        create.receive().whenResult(dataflowCommand -> {
            if (dataflowCommand != null) {
                doRead(create, dataflowCommand);
            } else {
                this.logger.warn("unexpected end of stream");
                create.close();
            }
        }).whenException(th -> {
            this.logger.error("received error while trying to read", th);
            create.close();
        });
    }

    private void doRead(Messaging<DataflowCommand, DataflowResponse> messaging, DataflowCommand dataflowCommand) {
        CommandHandler commandHandler = this.handlers.get(dataflowCommand.getClass());
        if (commandHandler != null) {
            commandHandler.onCommand(messaging, dataflowCommand);
        } else {
            messaging.close();
            this.logger.error("missing handler for {}", dataflowCommand);
        }
    }

    protected void onClose(SettablePromise<Void> settablePromise) {
        ArrayList arrayList = new ArrayList(this.pendingStreams.values());
        this.pendingStreams.clear();
        arrayList.forEach((v0) -> {
            v0.close();
        });
        settablePromise.set((Object) null);
    }
}
