package io.scalecube.services.transport.rsocket.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.transport.client.api.ClientChannel;
import io.scalecube.services.transport.client.api.ClientTransport;
import io.scalecube.transport.Address;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.tcp.TcpClient;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/client/RSocketClientTransport.class */
public class RSocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ConcurrentMap<Address, Mono<RSocket>> rSockets = new ConcurrentHashMap();
    private final ServiceMessageCodec codec;

    public RSocketClientTransport(ServiceMessageCodec serviceMessageCodec) {
        this.codec = serviceMessageCodec;
    }

    public ClientChannel create(Address address) {
        return new RSocketServiceClientAdapter(this.rSockets.computeIfAbsent(address, this::connect), this.codec);
    }

    private Mono<RSocket> connect(Address address) {
        CompletableFuture completableFuture = new CompletableFuture();
        RSocketFactory.connect().transport(createTcpClientTransport(address)).start().subscribe(rSocket -> {
            LOGGER.debug("Connected successfully on {}", address);
            rSocket.onClose().subscribe(r6 -> {
                this.rSockets.remove(address);
                LOGGER.debug("Connection closed on {} and removed from the pool", address);
            });
            completableFuture.complete(rSocket);
        }, th -> {
            LOGGER.warn("Connect failed on {}, cause: {}", address, th);
            this.rSockets.remove(address);
            completableFuture.completeExceptionally(th);
        });
        return Mono.fromFuture(completableFuture);
    }

    private TcpClientTransport createTcpClientTransport(Address address) {
        return TcpClientTransport.create(TcpClient.create(builder -> {
            builder.disablePool().host(address.host()).port(address.port()).afterNettyContextInit(nettyContext -> {
                nettyContext.addHandler(new ChannelInboundHandlerAdapter() { // from class: io.scalecube.services.transport.rsocket.client.RSocketClientTransport.1
                    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
                        RSocketClientTransport.this.rSockets.remove(address);
                        RSocketClientTransport.LOGGER.debug("Connection inactive on {} and removed from the pool", address);
                        channelHandlerContext.fireChannelInactive();
                    }
                });
            });
        }));
    }
}
