package io.activej.dataflow;

import io.activej.async.process.AsyncCloseable;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.csp.queue.ChannelBuffer;
import io.activej.csp.queue.ChannelBufferWithFallback;
import io.activej.csp.queue.ChannelFileBuffer;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandDownload;
import io.activej.dataflow.command.DataflowCommandExecute;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.di.BinarySerializerModule;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.Node;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.processor.StreamSupplierTransformer;
import io.activej.eventloop.net.SocketSettings;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promise;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/dataflow/DataflowClient.class */
public final class DataflowClient {
    private static final Logger logger = LoggerFactory.getLogger(DataflowClient.class);
    private final Executor executor;
    private final Path secondaryPath;
    private final ByteBufsCodec<DataflowResponse, DataflowCommand> codec;
    private final BinarySerializerModule.BinarySerializerLocator serializers;
    private int bufferMinSize;
    private int bufferMaxSize;
    private final SocketSettings socketSettings = SocketSettings.createDefault();
    private final AtomicInteger secondaryId = new AtomicInteger(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));

    /* loaded from: input_file:io/activej/dataflow/DataflowClient$Session.class */
    public class Session implements AsyncCloseable {
        private final InetSocketAddress address;
        private final Messaging<DataflowResponse, DataflowCommand> messaging;

        private Session(InetSocketAddress inetSocketAddress, AsyncTcpSocketNio asyncTcpSocketNio) {
            this.address = inetSocketAddress;
            this.messaging = MessagingWithBinaryStreaming.create(asyncTcpSocketNio, DataflowClient.this.codec);
        }

        public Promise<Void> execute(Collection<Node> collection) {
            Promise send = this.messaging.send(new DataflowCommandExecute(new ArrayList(collection)));
            Messaging<DataflowResponse, DataflowCommand> messaging = this.messaging;
            Objects.requireNonNull(messaging);
            return send.then(messaging::receive).then(dataflowResponse -> {
                this.messaging.close();
                String error = dataflowResponse.getError();
                return error != null ? Promise.ofException(new Exception("Error on remote server " + this.address + ": " + error)) : Promise.complete();
            });
        }

        public void closeEx(@NotNull Throwable th) {
            this.messaging.closeEx(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter.class */
    public static class StreamTraceCounter<T> implements StreamSupplierTransformer<T, StreamSupplier<T>> {
        private final StreamId streamId;
        private final InetSocketAddress address;
        private int count;
        private final StreamTraceCounter<T>.Input input;
        private final StreamTraceCounter<T>.Output output;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter$Input.class */
        public final class Input extends AbstractStreamConsumer<T> {
            private Input() {
            }

            protected void onEndOfStream() {
                StreamTraceCounter.this.output.sendEndOfStream();
            }

            protected void onComplete() {
                DataflowClient.logger.info("Received {} items total from stream {}({})", new Object[]{Integer.valueOf(StreamTraceCounter.this.count), StreamTraceCounter.this.streamId, StreamTraceCounter.this.address});
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/activej/dataflow/DataflowClient$StreamTraceCounter$Output.class */
        public final class Output extends AbstractStreamSupplier<T> {
            static final /* synthetic */ boolean $assertionsDisabled;

            private Output() {
            }

            protected void onResumed() {
                StreamDataAcceptor dataAcceptor = getDataAcceptor();
                if (!$assertionsDisabled && dataAcceptor == null) {
                    throw new AssertionError();
                }
                StreamTraceCounter.this.input.resume(obj -> {
                    StreamTraceCounter.access$308(StreamTraceCounter.this);
                    if (StreamTraceCounter.this.count == 1 || StreamTraceCounter.this.count % 1000 == 0) {
                        DataflowClient.logger.info("Received {} items from stream {}({}): {}", new Object[]{Integer.valueOf(StreamTraceCounter.this.count), StreamTraceCounter.this.streamId, StreamTraceCounter.this.address, obj});
                    }
                    dataAcceptor.accept(obj);
                });
            }

            protected void onSuspended() {
                StreamTraceCounter.this.input.suspend();
            }

            static {
                $assertionsDisabled = !DataflowClient.class.desiredAssertionStatus();
            }
        }

        private StreamTraceCounter(StreamId streamId, InetSocketAddress inetSocketAddress) {
            this.count = 0;
            this.streamId = streamId;
            this.address = inetSocketAddress;
            this.input = new Input();
            this.output = new Output();
            Promise acknowledgement = this.input.getAcknowledgement();
            StreamTraceCounter<T>.Output output = this.output;
            Objects.requireNonNull(output);
            acknowledgement.whenException(output::closeEx);
            Promise endOfStream = this.output.getEndOfStream();
            StreamTraceCounter<T>.Input input = this.input;
            Objects.requireNonNull(input);
            Promise whenResult = endOfStream.whenResult(input::acknowledge);
            StreamTraceCounter<T>.Input input2 = this.input;
            Objects.requireNonNull(input2);
            whenResult.whenException(input2::closeEx);
        }

        /* renamed from: transform, reason: merged with bridge method [inline-methods] */
        public StreamSupplier<T> m1transform(StreamSupplier<T> streamSupplier) {
            streamSupplier.streamTo(this.input);
            return this.output;
        }

        static /* synthetic */ int access$308(StreamTraceCounter streamTraceCounter) {
            int i = streamTraceCounter.count;
            streamTraceCounter.count = i + 1;
            return i;
        }
    }

    public DataflowClient(Executor executor, Path path, ByteBufsCodec<DataflowResponse, DataflowCommand> byteBufsCodec, BinarySerializerModule.BinarySerializerLocator binarySerializerLocator) {
        this.executor = executor;
        this.secondaryPath = path;
        this.codec = byteBufsCodec;
        this.serializers = binarySerializerLocator;
    }

    public DataflowClient withBufferSizes(int i, int i2) {
        this.bufferMinSize = i;
        this.bufferMaxSize = i2;
        return this;
    }

    public <T> Promise<StreamSupplier<T>> download(InetSocketAddress inetSocketAddress, StreamId streamId, Class<T> cls) {
        return AsyncTcpSocketNio.connect(inetSocketAddress, 0L, this.socketSettings).then(asyncTcpSocketNio -> {
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocketNio, this.codec);
            return create.send(new DataflowCommandDownload(streamId)).map(r12 -> {
                return ((StreamSupplier) ((StreamSupplier) ((ChannelSupplier) create.receiveBinaryStream().transformWith(new ChannelBufferWithFallback((this.bufferMinSize == 0 && this.bufferMaxSize == 0) ? new ChannelZeroBuffer() : new ChannelBuffer(this.bufferMinSize, this.bufferMaxSize), () -> {
                    return ChannelFileBuffer.create(this.executor, this.secondaryPath.resolve(this.secondaryId.getAndIncrement() + ".bin"));
                }))).transformWith(ChannelDeserializer.create(this.serializers.get(cls)).withExplicitEndOfStream())).transformWith(new StreamTraceCounter(streamId, inetSocketAddress))).withEndOfStream(promise -> {
                    Objects.requireNonNull(create);
                    return promise.whenComplete(create::close);
                });
            });
        });
    }

    public Promise<Session> connect(InetSocketAddress inetSocketAddress) {
        return AsyncTcpSocketNio.connect(inetSocketAddress, 0L, this.socketSettings).map(asyncTcpSocketNio -> {
            return new Session(inetSocketAddress, asyncTcpSocketNio);
        });
    }
}
