package io.scalecube.transport;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/transport/TransportImpl.class */
public final class TransportImpl implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
    private final TransportConfig config;
    private final LoopResources loopResources;
    private final DirectProcessor<Message> messagesSubject;
    private final FluxSink<Message> messageSink;
    private final Map<Address, Mono<? extends Connection>> connections;
    private final ExceptionHandler exceptionHandler;
    private final InboundChannelInitializer inboundPipeline;
    private final OutboundChannelInitializer outboundPipeline;
    private final MonoProcessor<Void> onClose;
    private final NetworkEmulator networkEmulator;
    private final Address address;
    private final DisposableServer server;

    /* loaded from: input_file:io/scalecube/transport/TransportImpl$InboundChannelInitializer.class */
    private final class InboundChannelInitializer implements BiConsumer<ConnectionObserver, Channel> {
        private InboundChannelInitializer() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(ConnectionObserver connectionObserver, Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(32768, 0, 2, 0, 2)});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    /* loaded from: input_file:io/scalecube/transport/TransportImpl$OutboundChannelInitializer.class */
    private final class OutboundChannelInitializer implements BiConsumer<ConnectionObserver, Channel> {
        private OutboundChannelInitializer() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(ConnectionObserver connectionObserver, Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(2)});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    public TransportImpl(TransportConfig transportConfig) {
        this.config = (TransportConfig) Objects.requireNonNull(transportConfig);
        this.loopResources = LoopResources.create("sc-cluster-io", 1, 1, true);
        this.messagesSubject = DirectProcessor.create();
        this.messageSink = this.messagesSubject.sink();
        this.connections = new ConcurrentHashMap();
        this.exceptionHandler = new ExceptionHandler();
        this.inboundPipeline = new InboundChannelInitializer();
        this.outboundPipeline = new OutboundChannelInitializer();
        this.onClose = MonoProcessor.create();
        this.networkEmulator = null;
        this.address = null;
        this.server = null;
    }

    private TransportImpl(Address address, DisposableServer disposableServer, NetworkEmulator networkEmulator, TransportImpl transportImpl) {
        this.address = (Address) Objects.requireNonNull(address);
        this.server = (DisposableServer) Objects.requireNonNull(disposableServer);
        this.networkEmulator = (NetworkEmulator) Objects.requireNonNull(networkEmulator);
        this.config = transportImpl.config;
        this.loopResources = transportImpl.loopResources;
        this.messagesSubject = transportImpl.messagesSubject;
        this.messageSink = transportImpl.messageSink;
        this.connections = transportImpl.connections;
        this.exceptionHandler = transportImpl.exceptionHandler;
        this.inboundPipeline = transportImpl.inboundPipeline;
        this.outboundPipeline = transportImpl.outboundPipeline;
        this.onClose = transportImpl.onClose;
    }

    public Mono<Transport> bind0() {
        return newTcpServer().handle(this::onMessage).bind().doOnSuccess(disposableServer -> {
            LOGGER.info("Bound cluster transport on {}:{}", disposableServer.host(), Integer.valueOf(disposableServer.port()));
        }).doOnError(th -> {
            LOGGER.error("Failed to bind cluster transport on port={}, cause: {}", Integer.valueOf(this.config.getPort()), th);
        }).map(this::onBind);
    }

    @Override // io.scalecube.transport.Transport
    public Address address() {
        return this.address;
    }

    @Override // io.scalecube.transport.Transport
    public boolean isStopped() {
        return this.onClose.isDisposed();
    }

    @Override // io.scalecube.transport.Transport
    public NetworkEmulator networkEmulator() {
        return this.networkEmulator;
    }

    @Override // io.scalecube.transport.Transport
    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            if (!this.onClose.isDisposed()) {
                this.messageSink.complete();
                Mono then = closeServer().then(closeConnections());
                LoopResources loopResources = this.loopResources;
                loopResources.getClass();
                Mono doOnTerminate = then.doOnTerminate(loopResources::dispose);
                MonoProcessor<Void> monoProcessor = this.onClose;
                monoProcessor.getClass();
                doOnTerminate.doOnTerminate(monoProcessor::onComplete).subscribe();
            }
            return this.onClose;
        });
    }

    @Override // io.scalecube.transport.Transport
    public final Flux<Message> listen() {
        return this.messagesSubject.onBackpressureBuffer();
    }

    @Override // io.scalecube.transport.Transport
    public Mono<Void> send(Address address, Message message) {
        return getOrConnect(address).flatMap(connection -> {
            return send0(connection, message, address);
        }).then().doOnError(th -> {
            LOGGER.debug("Failed to send {} to {}, cause: {}", new Object[]{message, address, th});
        });
    }

    private Mono<Void> onMessage(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
        Flux map = nettyInbound.receive().retain().map(this::toMessage);
        FluxSink<Message> fluxSink = this.messageSink;
        fluxSink.getClass();
        return map.doOnNext((v1) -> {
            r1.next(v1);
        }).then();
    }

    private Message toMessage(ByteBuf byteBuf) {
        try {
            ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf, true);
            Throwable th = null;
            try {
                try {
                    Message deserialize = MessageCodec.deserialize(byteBufInputStream);
                    if (byteBufInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteBufInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteBufInputStream.close();
                        }
                    }
                    return deserialize;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new DecoderException(e);
        }
    }

    private TransportImpl onBind(DisposableServer disposableServer) {
        Address create = Address.create(disposableServer.address().getHostString(), disposableServer.address().getPort());
        return new TransportImpl(create, disposableServer, new NetworkEmulator(create, this.config.isUseNetworkEmulator()), this);
    }

    private Mono<? extends Void> send0(Connection connection, Message message, Address address) {
        Objects.requireNonNull(message.sender(), "sender must be not null");
        return connection.outbound().options((v0) -> {
            v0.flushOnEach();
        }).send(Mono.just(message).flatMap(message2 -> {
            return this.networkEmulator.tryFail(message2, address);
        }).flatMap(message3 -> {
            return this.networkEmulator.tryDelay(message3, address);
        }).map(this::toByteBuf)).then();
    }

    private ByteBuf toByteBuf(Message message) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            MessageCodec.serialize(message, new ByteBufOutputStream(buffer));
            return buffer;
        } catch (Exception e) {
            buffer.release();
            throw new EncoderException(e);
        }
    }

    private Mono<Connection> getOrConnect(Address address) {
        return Mono.create(monoSink -> {
            Mono<? extends Connection> computeIfAbsent = this.connections.computeIfAbsent(address, this::connect0);
            monoSink.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            monoSink.getClass();
            computeIfAbsent.subscribe(consumer, monoSink::error);
        });
    }

    private Mono<? extends Connection> connect0(Address address) {
        return newTcpClient(address).doOnDisconnected(connection -> {
            LOGGER.debug("Disconnected from: {} {}", address, connection.channel());
            this.connections.remove(address);
        }).doOnConnected(connection2 -> {
            LOGGER.debug("Connected to {}: {}", address, connection2.channel());
        }).connect().doOnError(th -> {
            LOGGER.warn("Failed to connect to remote address {}, cause: {}", address, th);
            this.connections.remove(address);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> {
            return (Mono) Optional.ofNullable(this.server).map(disposableServer -> {
                disposableServer.dispose();
                return disposableServer.onDispose().doOnError(th -> {
                    LOGGER.warn("Failed to close server: " + th);
                }).onErrorResume(th2 -> {
                    return Mono.empty();
                });
            }).orElse(Mono.empty());
        });
    }

    private Mono<Void> closeConnections() {
        return Mono.fromRunnable(() -> {
            this.connections.values().forEach(mono -> {
                mono.doOnNext((v0) -> {
                    v0.dispose();
                }).flatMap((v0) -> {
                    return v0.onDispose();
                }).subscribe((Consumer) null, th -> {
                    LOGGER.warn("Failed to close connection: " + th);
                });
            });
        });
    }

    private TcpServer newTcpServer() {
        return TcpServer.create().runOn(this.loopResources).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).addressSupplier(() -> {
            return new InetSocketAddress(this.config.getPort());
        }).bootstrap(serverBootstrap -> {
            return BootstrapHandlers.updateConfiguration(serverBootstrap, "inbound", this.inboundPipeline);
        });
    }

    private TcpClient newTcpClient(Address address) {
        return TcpClient.create(ConnectionProvider.newConnection()).runOn(this.loopResources).host(address.host()).port(address.port()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.config.getConnectTimeout())).bootstrap(bootstrap -> {
            return BootstrapHandlers.updateConfiguration(bootstrap, "outbound", this.outboundPipeline);
        });
    }
}
