package io.servicetalk.tcp.netty.internal;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCounted;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelSet;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import io.servicetalk.transport.netty.internal.NettyServerContext;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/servicetalk/tcp/netty/internal/TcpServerBinder.class */
public final class TcpServerBinder {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) TcpServerBinder.class);

    private TcpServerBinder() {
    }

    public static <T extends ConnectionContext> Single<ServerContext> bind(SocketAddress socketAddress, final ReadOnlyTcpServerConfig readOnlyTcpServerConfig, boolean z, final ExecutionContext executionContext, @Nullable final ConnectionAcceptor connectionAcceptor, final BiFunction<Channel, ConnectionObserver, Single<T>> biFunction, final Consumer<T> consumer) {
        Objects.requireNonNull(biFunction);
        Objects.requireNonNull(consumer);
        SocketAddress nettyAddress = BuilderUtils.toNettyAddress(socketAddress);
        EventLoopAwareNettyIoExecutor eventLoopAwareNettyIoExecutor = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor());
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        configure(readOnlyTcpServerConfig, z, serverBootstrap, eventLoopAwareNettyIoExecutor.eventLoopGroup(), nettyAddress.getClass());
        final ChannelSet channelSet = new ChannelSet(executionContext.executor());
        serverBootstrap.handler(new ChannelInboundHandlerAdapter() { // from class: io.servicetalk.tcp.netty.internal.TcpServerBinder.1
            @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                if (obj instanceof ReferenceCounted) {
                    try {
                        throw new IllegalArgumentException("Unexpected ReferenceCounted msg in 'accept' pipeline: " + obj);
                    } catch (Throwable th) {
                        ((ReferenceCounted) obj).release();
                        throw th;
                    }
                } else {
                    if ((obj instanceof Channel) && !ChannelSet.this.addIfAbsent((Channel) obj)) {
                        TcpServerBinder.LOGGER.warn("Channel ({}) not added to ChannelSet", obj);
                    }
                    channelHandlerContext.fireChannelRead(obj);
                }
            }
        });
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() { // from class: io.servicetalk.tcp.netty.internal.TcpServerBinder.2
            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) {
                Single single = (Single) biFunction.apply(channel, readOnlyTcpServerConfig.transportObserver().onNewConnection());
                if (connectionAcceptor != null) {
                    ConnectionAcceptor connectionAcceptor2 = connectionAcceptor;
                    ExecutionContext executionContext2 = executionContext;
                    single = single.flatMap(connectionContext -> {
                        return Single.defer(() -> {
                            return connectionAcceptor2.accept(connectionContext).concat(Single.succeeded(connectionContext));
                        }).subscribeOn(executionContext2.executor());
                    });
                }
                single.beforeOnError(th -> {
                    if (TcpServerBinder.LOGGER.isDebugEnabled()) {
                        TcpServerBinder.LOGGER.debug("Failed to create a connection for remote address {}", channel.remoteAddress(), th);
                    }
                    ChannelCloseUtils.close(channel, th);
                }).subscribe(consumer);
            }
        });
        final ChannelFuture bind = serverBootstrap.bind(nettyAddress);
        return new SubscribableSingle<ServerContext>() { // from class: io.servicetalk.tcp.netty.internal.TcpServerBinder.3
            @Override // io.servicetalk.concurrent.api.Single
            protected void handleSubscribe(SingleSource.Subscriber<? super ServerContext> subscriber) {
                ChannelFuture channelFuture = ChannelFuture.this;
                subscriber.onSubscribe(() -> {
                    channelFuture.cancel(true);
                });
                ChannelFuture channelFuture2 = ChannelFuture.this;
                ChannelSet channelSet2 = channelSet;
                ConnectionAcceptor connectionAcceptor2 = connectionAcceptor;
                ExecutionContext executionContext2 = executionContext;
                channelFuture2.addListener2(channelFuture3 -> {
                    Channel channel = channelFuture3.channel();
                    if (channelFuture3.cause() == null) {
                        subscriber.onSuccess(NettyServerContext.wrap(channel, channelSet2, connectionAcceptor2, executionContext2));
                    } else {
                        ChannelCloseUtils.close(channel, channelFuture3.cause());
                        subscriber.onError(channelFuture3.cause());
                    }
                });
            }
        };
    }

    private static void configure(ReadOnlyTcpServerConfig readOnlyTcpServerConfig, boolean z, ServerBootstrap serverBootstrap, @Nullable EventLoopGroup eventLoopGroup, Class<? extends SocketAddress> cls) {
        if (eventLoopGroup == null) {
            throw new IllegalStateException("IoExecutor must be specified before building");
        }
        serverBootstrap.group(eventLoopGroup);
        serverBootstrap.channel(BuilderUtils.serverChannel(eventLoopGroup, cls));
        for (Map.Entry<ChannelOption, Object> entry : readOnlyTcpServerConfig.options().entrySet()) {
            serverBootstrap.childOption(entry.getKey(), entry.getValue());
        }
        serverBootstrap.childOption(ChannelOption.AUTO_READ, Boolean.valueOf(z));
        serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(readOnlyTcpServerConfig.backlog()));
        PooledByteBufAllocator pooledByteBufAllocator = CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR;
        serverBootstrap.option(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
    }
}
