package io.reactivesocket.transport.tcp.client;

import io.netty.buffer.ByteBuf;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.transport.TransportClient;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameLogger;
import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec;
import io.reactivesocket.transport.tcp.TcpDuplexConnection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import java.net.SocketAddress;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.event.Level;
import rx.RxReactiveStreams;

/* loaded from: input_file:io/reactivesocket/transport/tcp/client/TcpTransportClient.class */
public class TcpTransportClient implements TransportClient {
    private final TcpClient<Frame, Frame> rxNettyClient;

    public TcpTransportClient(TcpClient<Frame, Frame> tcpClient) {
        this.rxNettyClient = tcpClient;
    }

    public Publisher<DuplexConnection> connect() {
        return RxReactiveStreams.toPublisher(this.rxNettyClient.createConnectionRequest().map(connection -> {
            return new TcpDuplexConnection(connection);
        }));
    }

    public TcpTransportClient configureClient(Function<TcpClient<Frame, Frame>, TcpClient<Frame, Frame>> function) {
        return new TcpTransportClient(function.apply(this.rxNettyClient));
    }

    public TcpTransportClient logReactiveSocketFrames(String str, Level level) {
        return configureClient(tcpClient -> {
            return tcpClient.addChannelHandlerLast("reactive-socket-frame-codec", () -> {
                return new ReactiveSocketFrameLogger(str, level);
            });
        });
    }

    public static TcpTransportClient create(SocketAddress socketAddress) {
        return new TcpTransportClient(_configureClient(TcpClient.newClient(socketAddress)));
    }

    public static TcpTransportClient create(TcpClient<ByteBuf, ByteBuf> tcpClient) {
        return new TcpTransportClient(_configureClient(tcpClient));
    }

    private static TcpClient<Frame, Frame> _configureClient(TcpClient<ByteBuf, ByteBuf> tcpClient) {
        return tcpClient.addChannelHandlerLast("length-codec", ReactiveSocketLengthCodec::new).addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new);
    }
}
