package io.activej.dataflow;

import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.exception.UnknownFormatException;
import io.activej.common.function.FunctionEx;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.net.IMessaging;
import io.activej.csp.net.Messaging;
import io.activej.csp.process.transformer.ChannelSupplierTransformer;
import io.activej.csp.process.transformer.ChannelTransformer;
import io.activej.csp.process.transformer.ChannelTransformers;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.dataflow.exception.DataflowException;
import io.activej.dataflow.exception.DataflowStacklessException;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.StreamSchema;
import io.activej.dataflow.inject.BinarySerializerModule;
import io.activej.dataflow.messaging.DataflowRequest;
import io.activej.dataflow.messaging.DataflowResponse;
import io.activej.dataflow.node.Node;
import io.activej.datastream.consumer.AbstractStreamConsumer;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.processor.transformer.StreamSupplierTransformer;
import io.activej.datastream.supplier.AbstractStreamSupplier;
import io.activej.datastream.supplier.StreamDataAcceptor;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.net.socket.tcp.TcpSocket;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractNioReactive;
import io.activej.reactor.ImplicitlyReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.net.SocketSettings;
import io.activej.reactor.nio.NioReactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/dataflow/DataflowClient.class */
public final class DataflowClient extends AbstractNioReactive {
    private static final Logger logger = LoggerFactory.getLogger(DataflowClient.class);
    private final SocketSettings socketSettings;
    private final ByteBufsCodec<DataflowResponse, DataflowRequest> codec;
    private final BinarySerializerModule.BinarySerializerLocator serializers;

    /* loaded from: input_file:io/activej/dataflow/DataflowClient$Session.class */
    public class Session extends ImplicitlyReactive implements AsyncCloseable {
        private final InetSocketAddress address;
        private final IMessaging<DataflowResponse, DataflowRequest> messaging;

        private Session(InetSocketAddress inetSocketAddress, ITcpSocket iTcpSocket) {
            this.address = inetSocketAddress;
            this.messaging = Messaging.create(iTcpSocket, DataflowClient.this.codec);
        }

        public Promise<Void> execute(long j, List<Node> list) {
            Reactive.checkInReactorThread(this);
            Promise map = DataflowClient.performHandshake(this.messaging).then(() -> {
                return this.messaging.send(new DataflowRequest.Execute(j, list)).mapException(IOException.class, iOException -> {
                    return new DataflowStacklessException("Failed to send command to " + this.address, iOException);
                });
            }).then(() -> {
                return this.messaging.receive().mapException(IOException.class, iOException -> {
                    return new DataflowStacklessException("Failed to receive response from " + this.address, iOException);
                });
            }).map(DataflowClient.expectResponse(DataflowResponse.Result.class));
            IMessaging<DataflowResponse, DataflowRequest> iMessaging = this.messaging;
            Objects.requireNonNull(iMessaging);
            return map.whenException(iMessaging::closeEx).whenResult(result -> {
                this.messaging.close();
                String error = result.error();
                if (error != null) {
                    throw new DataflowException("Error on remote server " + this.address + ": " + error);
                }
            }).toVoid();
        }

        public void closeEx(Exception exc) {
            Reactive.checkInReactorThread(this);
            this.messaging.closeEx(exc);
        }
    }

    /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter.class */
    public static class StreamTraceCounter<T> implements StreamSupplierTransformer<T, StreamSupplier<T>> {
        private final StreamId streamId;
        private final InetSocketAddress address;
        private int count = 0;
        private final StreamTraceCounter<T>.Input input = new Input();
        private final StreamTraceCounter<T>.Output output = new Output();

        /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter$Input.class */
        public final class Input extends AbstractStreamConsumer<T> {
            public Input() {
            }

            protected void onEndOfStream() {
                StreamTraceCounter.this.output.sendEndOfStream();
            }

            protected void onComplete() {
                DataflowClient.logger.info("Received {} items total from stream {}({})", new Object[]{Integer.valueOf(StreamTraceCounter.this.count), StreamTraceCounter.this.streamId, StreamTraceCounter.this.address});
            }
        }

        /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter$Output.class */
        public final class Output extends AbstractStreamSupplier<T> {
            static final /* synthetic */ boolean $assertionsDisabled;

            public Output() {
            }

            protected void onResumed() {
                StreamDataAcceptor dataAcceptor = getDataAcceptor();
                if (!$assertionsDisabled && dataAcceptor == null) {
                    throw new AssertionError();
                }
                StreamTraceCounter.this.input.resume(obj -> {
                    StreamTraceCounter streamTraceCounter = StreamTraceCounter.this;
                    int i = streamTraceCounter.count + 1;
                    streamTraceCounter.count = i;
                    if (i == 1 || StreamTraceCounter.this.count % 1000 == 0) {
                        DataflowClient.logger.info("Received {} items from stream {}({}): {}", new Object[]{Integer.valueOf(StreamTraceCounter.this.count), StreamTraceCounter.this.streamId, StreamTraceCounter.this.address, obj});
                    }
                    dataAcceptor.accept(obj);
                });
            }

            protected void onSuspended() {
                StreamTraceCounter.this.input.suspend();
            }

            static {
                $assertionsDisabled = !DataflowClient.class.desiredAssertionStatus();
            }
        }

        private StreamTraceCounter(StreamId streamId, InetSocketAddress inetSocketAddress) {
            this.streamId = streamId;
            this.address = inetSocketAddress;
            Promise acknowledgement = this.input.getAcknowledgement();
            StreamTraceCounter<T>.Output output = this.output;
            Objects.requireNonNull(output);
            acknowledgement.whenException(output::closeEx);
            Promise acknowledgement2 = this.output.getAcknowledgement();
            StreamTraceCounter<T>.Input input = this.input;
            Objects.requireNonNull(input);
            Promise whenResult = acknowledgement2.whenResult(input::acknowledge);
            StreamTraceCounter<T>.Input input2 = this.input;
            Objects.requireNonNull(input2);
            whenResult.whenException(input2::closeEx);
        }

        /* renamed from: transform, reason: merged with bridge method [inline-methods] */
        public StreamSupplier<T> m1transform(StreamSupplier<T> streamSupplier) {
            streamSupplier.streamTo(this.input);
            return this.output;
        }
    }

    private DataflowClient(NioReactor nioReactor, ByteBufsCodec<DataflowResponse, DataflowRequest> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator) {
        super(nioReactor);
        this.socketSettings = SocketSettings.defaultInstance();
        this.codec = byteBufsCodec;
        this.serializers = binarySerializerLocator;
    }

    public static DataflowClient create(NioReactor nioReactor, ByteBufsCodec<DataflowResponse, DataflowRequest> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator) {
        return new DataflowClient(nioReactor, byteBufsCodec, binarySerializerLocator);
    }

    public <T> StreamSupplier<T> download(InetSocketAddress inetSocketAddress, StreamId streamId, StreamSchema<T> streamSchema, ChannelTransformer<ByteBuf, ByteBuf> channelTransformer) {
        Reactive.checkInReactorThread(this);
        return StreamSuppliers.ofPromise(TcpSocket.connect(this.reactor, inetSocketAddress, 0L, this.socketSettings).mapException(IOException.class, iOException -> {
            return new DataflowStacklessException("Failed to connect to " + inetSocketAddress, iOException);
        }).then(tcpSocket -> {
            Messaging create = Messaging.create(tcpSocket, this.codec);
            return performHandshake(create).then(() -> {
                return create.send(new DataflowRequest.Download(streamId)).mapException(IOException.class, iOException2 -> {
                    return new DataflowException("Failed to download from " + inetSocketAddress, iOException2);
                });
            }).map(r12 -> {
                return ((StreamSupplier) ((StreamSupplier) ((ChannelSupplier) create.receiveBinaryStream().transformWith(channelTransformer)).transformWith((ChannelSupplierTransformer) ChannelDeserializer.builder(streamSchema.createSerializer(this.serializers)).withExplicitEndOfStream().build())).transformWith(new StreamTraceCounter(streamId, inetSocketAddress))).withEndOfStream(promise -> {
                    Promise mapException = promise.mapException(exc -> {
                        return ((exc instanceof IOException) || (exc instanceof UnknownFormatException) || (exc instanceof TruncatedDataException)) ? new DataflowStacklessException("Error when downloading from " + inetSocketAddress, exc) : new DataflowException("Error when downloading from " + inetSocketAddress, exc);
                    });
                    Objects.requireNonNull(create);
                    return mapException.whenComplete(create::close);
                });
            });
        }));
    }

    public <T> StreamSupplier<T> download(InetSocketAddress inetSocketAddress, StreamId streamId, StreamSchema<T> streamSchema) {
        Reactive.checkInReactorThread(this);
        return download(inetSocketAddress, streamId, streamSchema, ChannelTransformers.identity());
    }

    private static <T extends DataflowResponse> FunctionEx<DataflowResponse, T> expectResponse(Class<T> cls) {
        return dataflowResponse -> {
            if (cls.isAssignableFrom(dataflowResponse.getClass())) {
                return dataflowResponse;
            }
            throw new DataflowException("Unexpected response: " + dataflowResponse);
        };
    }

    public Promise<Session> connect(InetSocketAddress inetSocketAddress) {
        Reactive.checkInReactorThread(this);
        return TcpSocket.connect(this.reactor, inetSocketAddress, 0L, this.socketSettings).map(tcpSocket -> {
            return new Session(inetSocketAddress, tcpSocket);
        }).mapException(exc -> {
            return new DataflowException("Could not connect to " + inetSocketAddress, exc);
        });
    }

    public static Promise<Void> performHandshake(IMessaging<DataflowResponse, DataflowRequest> iMessaging) {
        Promise send = iMessaging.send(new DataflowRequest.Handshake(DataflowServer.VERSION));
        Objects.requireNonNull(iMessaging);
        return send.then(iMessaging::receive).map(expectResponse(DataflowResponse.Handshake.class)).mapException(IOException.class, (v1) -> {
            return new DataflowStacklessException(v1);
        }).whenResult(handshake -> {
            DataflowResponse.HandshakeFailure handshakeFailure = handshake.handshakeFailure();
            if (handshakeFailure != null) {
                throw new DataflowException(String.format("Handshake failed: %s. Minimal allowed version: %s", handshakeFailure.message(), handshakeFailure.minimalVersion()));
            }
        }).toVoid();
    }
}
