package io.scalecube.services.transport.rsocket;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ServerTransport;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServerTransport.class */
public class RSocketServerTransport implements ServerTransport {
    private static final int BOSS_THREADS_NUM = 1;
    private final ServiceMessageCodec codec;
    private final EventLoopGroup bossGroup;
    private final DelegatedLoopResources loopResources;
    private CloseableChannel server;
    private List<Connection> connections = new CopyOnWriteArrayList();
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
    private static final DefaultThreadFactory BOSS_THREAD_FACTORY = new DefaultThreadFactory("rsocket-boss", true);

    public RSocketServerTransport(ServiceMessageCodec serviceMessageCodec, boolean z, EventLoopGroup eventLoopGroup) {
        this.codec = serviceMessageCodec;
        this.bossGroup = z ? new EpollEventLoopGroup(BOSS_THREADS_NUM, BOSS_THREAD_FACTORY) : new NioEventLoopGroup(BOSS_THREADS_NUM, BOSS_THREAD_FACTORY);
        this.loopResources = new DelegatedLoopResources(z, this.bossGroup, eventLoopGroup);
    }

    public Mono<InetSocketAddress> bind(InetSocketAddress inetSocketAddress, ServiceMethodRegistry serviceMethodRegistry) {
        return Mono.defer(() -> {
            TcpServer doOnConnection = TcpServer.create().runOn(this.loopResources).addressSupplier(() -> {
                return inetSocketAddress;
            }).doOnConnection(connection -> {
                LOGGER.info("Accepted connection on {}", connection.channel());
                connection.onDispose(() -> {
                    LOGGER.info("Connection closed on {}", connection.channel());
                    this.connections.remove(connection);
                });
                this.connections.add(connection);
            });
            return RSocketFactory.receive().frameDecoder(frame -> {
                return ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain());
            }).acceptor(new RSocketServiceAcceptor(this.codec, serviceMethodRegistry)).transport(() -> {
                return TcpServerTransport.create(doOnConnection);
            }).start().map(closeableChannel -> {
                this.server = closeableChannel;
                return closeableChannel;
            }).map((v0) -> {
                return v0.address();
            });
        });
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            ArrayList arrayList = new ArrayList();
            arrayList.add(FutureMono.from(this.bossGroup.shutdownGracefully()));
            this.connections.stream().collect(() -> {
                return arrayList;
            }, (list, connection) -> {
                connection.dispose();
                list.add(connection.onTerminate());
            }, (list2, list3) -> {
            });
            if (this.server != null) {
                this.server.dispose();
                arrayList.add(this.server.onClose());
            }
            return Mono.when(arrayList);
        });
    }
}
