package io.reactivesocket.transport.tcp.server;

import io.netty.buffer.ByteBuf;
import io.reactivesocket.Frame;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameLogger;
import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec;
import io.reactivesocket.transport.tcp.TcpDuplexConnection;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.event.Level;
import rx.Observable;
import rx.RxReactiveStreams;

/* loaded from: input_file:io/reactivesocket/transport/tcp/server/TcpTransportServer.class */
public class TcpTransportServer implements TransportServer {
    private final TcpServer<Frame, Frame> rxNettyServer;

    /* loaded from: input_file:io/reactivesocket/transport/tcp/server/TcpTransportServer$Started.class */
    private class Started implements TransportServer.StartedServer {
        private Started() {
        }

        public SocketAddress getServerAddress() {
            return TcpTransportServer.this.rxNettyServer.getServerAddress();
        }

        public int getServerPort() {
            return TcpTransportServer.this.rxNettyServer.getServerPort();
        }

        public void awaitShutdown() {
            TcpTransportServer.this.rxNettyServer.awaitShutdown();
        }

        public void awaitShutdown(long j, TimeUnit timeUnit) {
            TcpTransportServer.this.rxNettyServer.awaitShutdown(j, timeUnit);
        }

        public void shutdown() {
            TcpTransportServer.this.rxNettyServer.shutdown();
        }
    }

    private TcpTransportServer(TcpServer<Frame, Frame> tcpServer) {
        this.rxNettyServer = tcpServer;
    }

    public TransportServer.StartedServer start(final TransportServer.ConnectionAcceptor connectionAcceptor) {
        this.rxNettyServer.start(new ConnectionHandler<Frame, Frame>() { // from class: io.reactivesocket.transport.tcp.server.TcpTransportServer.1
            public Observable<Void> handle(Connection<Frame, Frame> connection) {
                return RxReactiveStreams.toObservable(connectionAcceptor.apply(new TcpDuplexConnection(connection)));
            }
        });
        return new Started();
    }

    public TcpTransportServer configureServer(Function<TcpServer<Frame, Frame>, TcpServer<Frame, Frame>> function) {
        return new TcpTransportServer(function.apply(this.rxNettyServer));
    }

    public TcpTransportServer logReactiveSocketFrames(String str, Level level) {
        return configureServer(tcpServer -> {
            return tcpServer.addChannelHandlerLast("reactive-socket-frame-codec", () -> {
                return new ReactiveSocketFrameLogger(str, level);
            });
        });
    }

    public static TcpTransportServer create() {
        return create((TcpServer<ByteBuf, ByteBuf>) TcpServer.newServer());
    }

    public static TcpTransportServer create(int i) {
        return create((TcpServer<ByteBuf, ByteBuf>) TcpServer.newServer(i));
    }

    public static TcpTransportServer create(SocketAddress socketAddress) {
        return create((TcpServer<ByteBuf, ByteBuf>) TcpServer.newServer(socketAddress));
    }

    public static TcpTransportServer create(TcpServer<ByteBuf, ByteBuf> tcpServer) {
        return new TcpTransportServer(configure(tcpServer));
    }

    private static TcpServer<Frame, Frame> configure(TcpServer<ByteBuf, ByteBuf> tcpServer) {
        return tcpServer.addChannelHandlerLast("line-codec", ReactiveSocketLengthCodec::new).addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new);
    }
}
