package org.onlab.netty;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.netty.InternalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onlab/netty/NettyMessagingService.class */
public class NettyMessagingService implements MessagingService {
    private final Logger log;
    private final Endpoint localEp;
    private final ConcurrentMap<String, MessageHandler> handlers;
    private final AtomicLong messageIdGenerator;
    private final Cache<Long, SettableFuture<byte[]>> responseFutures;
    private final GenericKeyedObjectPool<Endpoint, Channel> channels;
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/onlab/netty/NettyMessagingService$InboundMessageDispatcher.class */
    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
        private InboundMessageDispatcher() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, InternalMessage internalMessage) throws Exception {
            String type = internalMessage.type();
            if (!type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
                MessageHandler messageHandler = NettyMessagingService.this.getMessageHandler(type);
                if (messageHandler != null) {
                    messageHandler.handle(internalMessage);
                    return;
                } else {
                    NettyMessagingService.this.log.debug("No handler registered for {}", type);
                    return;
                }
            }
            try {
                SettableFuture settableFuture = (SettableFuture) NettyMessagingService.this.responseFutures.getIfPresent(Long.valueOf(internalMessage.id()));
                if (settableFuture != null) {
                    settableFuture.set(internalMessage.payload());
                } else {
                    NettyMessagingService.this.log.warn("Received a reply for message id:[{}].  from {}. But was unable to locate the request handle", Long.valueOf(internalMessage.id()), internalMessage.sender());
                }
            } finally {
                NettyMessagingService.this.responseFutures.invalidate(Long.valueOf(internalMessage.id()));
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyMessagingService.this.log.error("Exception inside channel handling pipeline.", th);
            channelHandlerContext.close();
        }
    }

    /* loaded from: input_file:org/onlab/netty/NettyMessagingService$OnosCommunicationChannelFactory.class */
    private class OnosCommunicationChannelFactory implements KeyedPoolableObjectFactory<Endpoint, Channel> {
        private OnosCommunicationChannelFactory() {
        }

        public void activateObject(Endpoint endpoint, Channel channel) throws Exception {
        }

        public void destroyObject(Endpoint endpoint, Channel channel) throws Exception {
            channel.close();
        }

        public Channel makeObject(Endpoint endpoint) throws Exception {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32768);
            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8192);
            bootstrap.group(NettyMessagingService.this.clientGroup);
            bootstrap.channel(NettyMessagingService.this.clientChannelClass);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new OnosCommunicationChannelInitializer());
            return bootstrap.connect(endpoint.host(), endpoint.port()).sync().channel();
        }

        public void passivateObject(Endpoint endpoint, Channel channel) throws Exception {
        }

        public boolean validateObject(Endpoint endpoint, Channel channel) {
            return channel.isOpen();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onlab/netty/NettyMessagingService$OnosCommunicationChannelInitializer.class */
    public class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private OnosCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast("encoder", this.encoder).addLast("decoder", new MessageDecoder(NettyMessagingService.this)).addLast("handler", this.dispatcher);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onlab/netty/NettyMessagingService$WriteTask.class */
    public static class WriteTask implements Runnable {
        private final InternalMessage message;
        private final Channel channel;

        public WriteTask(Channel channel, InternalMessage internalMessage) {
            this.channel = channel;
            this.message = internalMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.channel.writeAndFlush(this.message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }

    private void initEventLoopGroup() {
        try {
            this.clientGroup = new EpollEventLoopGroup();
            this.serverGroup = new EpollEventLoopGroup();
            this.serverChannelClass = EpollServerSocketChannel.class;
            this.clientChannelClass = EpollSocketChannel.class;
        } catch (Throwable th) {
            this.log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", th.getMessage());
            this.clientGroup = new NioEventLoopGroup();
            this.serverGroup = new NioEventLoopGroup();
            this.serverChannelClass = NioServerSocketChannel.class;
            this.clientChannelClass = NioSocketChannel.class;
        }
    }

    public NettyMessagingService(String str, int i) {
        this.log = LoggerFactory.getLogger(getClass());
        this.handlers = new ConcurrentHashMap();
        this.messageIdGenerator = new AtomicLong(0L);
        this.responseFutures = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterWrite(10L, TimeUnit.SECONDS).removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() { // from class: org.onlab.netty.NettyMessagingService.1
            public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> removalNotification) {
                ((SettableFuture) removalNotification.getValue()).setException(new TimeoutException("Timedout waiting for reply"));
            }
        }).build();
        this.channels = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
        this.localEp = new Endpoint(str, i);
    }

    public NettyMessagingService() {
        this(8080);
    }

    public NettyMessagingService(int i) {
        this.log = LoggerFactory.getLogger(getClass());
        this.handlers = new ConcurrentHashMap();
        this.messageIdGenerator = new AtomicLong(0L);
        this.responseFutures = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterWrite(10L, TimeUnit.SECONDS).removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() { // from class: org.onlab.netty.NettyMessagingService.1
            public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> removalNotification) {
                ((SettableFuture) removalNotification.getValue()).setException(new TimeoutException("Timedout waiting for reply"));
            }
        }).build();
        this.channels = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
        try {
            this.localEp = new Endpoint(InetAddress.getLocalHost().getHostName(), i);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public void activate() throws Exception {
        this.channels.setTestOnBorrow(true);
        this.channels.setTestOnReturn(true);
        initEventLoopGroup();
        startAcceptingConnections();
    }

    public void deactivate() throws Exception {
        this.channels.close();
        this.serverGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }

    public Endpoint localEp() {
        return this.localEp;
    }

    @Override // org.onlab.netty.MessagingService
    public void sendAsync(Endpoint endpoint, String str, byte[] bArr) throws IOException {
        sendAsync(endpoint, new InternalMessage.Builder(this).withId(this.messageIdGenerator.incrementAndGet()).withSender(this.localEp).withType(str).withPayload(bArr).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAsync(Endpoint endpoint, InternalMessage internalMessage) throws IOException {
        Channel channel = null;
        try {
            try {
                channel = (Channel) this.channels.borrowObject(endpoint);
                channel.eventLoop().execute(new WriteTask(channel, internalMessage));
                this.channels.returnObject(endpoint, channel);
            } catch (Throwable th) {
                this.channels.returnObject(endpoint, channel);
                throw th;
            }
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw new IOException(e2);
        }
    }

    @Override // org.onlab.netty.MessagingService
    public ListenableFuture<byte[]> sendAndReceive(Endpoint endpoint, String str, byte[] bArr) throws IOException {
        SettableFuture create = SettableFuture.create();
        Long valueOf = Long.valueOf(this.messageIdGenerator.incrementAndGet());
        this.responseFutures.put(valueOf, create);
        try {
            sendAsync(endpoint, new InternalMessage.Builder(this).withId(valueOf.longValue()).withSender(this.localEp).withType(str).withPayload(bArr).build());
            return create;
        } catch (Exception e) {
            this.responseFutures.invalidate(valueOf);
            throw e;
        }
    }

    @Override // org.onlab.netty.MessagingService
    public void registerHandler(String str, MessageHandler messageHandler) {
        this.handlers.putIfAbsent(str, messageHandler);
    }

    @Override // org.onlab.netty.MessagingService
    public void unregisterHandler(String str) {
        this.handlers.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageHandler getMessageHandler(String str) {
        return this.handlers.get(str);
    }

    private void startAcceptingConnections() throws InterruptedException {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32768);
        serverBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8192);
        serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        serverBootstrap.group(this.serverGroup, this.clientGroup).channel(this.serverChannelClass).childHandler(new OnosCommunicationChannelInitializer()).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
        serverBootstrap.bind(this.localEp.port()).sync();
    }
}
