package com.github.quantranuk.protobuf.nio.impl;

import com.github.quantranuk.protobuf.nio.ProtoSerializer;
import com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel;
import com.github.quantranuk.protobuf.nio.ProtoSocketChannel;
import com.github.quantranuk.protobuf.nio.handlers.ConnectionHandler;
import com.github.quantranuk.protobuf.nio.handlers.DisconnectionHandler;
import com.github.quantranuk.protobuf.nio.handlers.MessageReceivedHandler;
import com.github.quantranuk.protobuf.nio.handlers.MessageSendFailureHandler;
import com.github.quantranuk.protobuf.nio.handlers.MessageSentHandler;
import com.github.quantranuk.protobuf.nio.utils.NamedThreadFactory;
import com.google.protobuf.Message;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/quantranuk/protobuf/nio/impl/AsyncProtoServerSocketChannel.class */
public class AsyncProtoServerSocketChannel implements ProtoServerSocketChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProtoServerSocketChannel.class);
    private final SocketAddress serverSocketAddress;
    private final int serverPort;
    private final List<ConnectionHandler> connectionHandlers = new CopyOnWriteArrayList();
    private final List<DisconnectionHandler> disconnectionHandlers = new CopyOnWriteArrayList();
    private final List<MessageReceivedHandler> messageReceivedHandlers = new CopyOnWriteArrayList();
    private final List<MessageSentHandler> messageSentHandlers = new CopyOnWriteArrayList();
    private final List<MessageSendFailureHandler> messageSendFailureHandlers = new CopyOnWriteArrayList();
    private final Map<SocketAddress, ProtoSocketChannel> socketChannels = new ConcurrentHashMap();
    private boolean isInitialized = false;
    private int readBufferSize = 8192;
    private int writeBufferSize = 8192;
    private long readTimeoutMillis = 0;
    private long writeTimeoutMillis = 10000;
    private AsynchronousServerSocketChannel serverSocketChannel;
    private ExecutorService acceptExecutor;
    private ExecutorService readExecutor;
    private ExecutorService writeExecutor;
    private ProtoSerializer serializer;

    public AsyncProtoServerSocketChannel(int i) {
        this.serverPort = i;
        this.serverSocketAddress = new InetSocketAddress(i);
    }

    @PostConstruct
    public void init() {
        if (this.isInitialized) {
            return;
        }
        this.isInitialized = true;
        this.acceptExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(AsyncProtoServerSocketChannel.class.getSimpleName() + "-Acceptor-" + this.serverPort));
        this.readExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(AsyncProtoServerSocketChannel.class.getSimpleName() + "-Reader-" + this.serverPort));
        this.writeExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(AsyncProtoServerSocketChannel.class.getSimpleName() + "-Writer-" + this.serverPort));
        try {
            this.serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(this.acceptExecutor));
        } catch (IOException e) {
            throw new IllegalStateException("Unable to open server socket channel", e);
        }
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void start() throws IOException {
        this.serverSocketChannel.bind(this.serverSocketAddress);
        LOGGER.info("Bind to port " + this.serverPort);
        this.acceptExecutor.execute(this::acceptNewConnection);
    }

    private void acceptNewConnection() {
        if (this.serverSocketChannel.isOpen()) {
            this.serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { // from class: com.github.quantranuk.protobuf.nio.impl.AsyncProtoServerSocketChannel.1
                @Override // java.nio.channels.CompletionHandler
                public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object obj) {
                    SocketAddress remoteAddress = AsyncProtoServerSocketChannel.this.getRemoteAddress(asynchronousSocketChannel);
                    AsyncProtoServerSocketChannel.LOGGER.info("Accepted connection from " + String.valueOf(remoteAddress));
                    AsyncProtoSocketChannel createProtobufSocketChannel = AsyncProtoServerSocketChannel.this.createProtobufSocketChannel(asynchronousSocketChannel, remoteAddress);
                    AsyncProtoServerSocketChannel.this.connectionHandlers.forEach(connectionHandler -> {
                        connectionHandler.onConnected(remoteAddress);
                    });
                    AsyncProtoServerSocketChannel.this.socketChannels.put(remoteAddress, createProtobufSocketChannel);
                    createProtobufSocketChannel.startReading();
                    AsyncProtoServerSocketChannel.this.acceptNewConnection();
                }

                @Override // java.nio.channels.CompletionHandler
                public void failed(Throwable th, Object obj) {
                    AsyncProtoServerSocketChannel.LOGGER.error("Unable to accept new connection at port " + AsyncProtoServerSocketChannel.this.serverPort, th);
                    if (AsyncProtoServerSocketChannel.this.serverSocketChannel.isOpen()) {
                        AsyncProtoServerSocketChannel.this.acceptNewConnection();
                    }
                }
            });
        }
    }

    private AsyncProtoSocketChannel createProtobufSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel, SocketAddress socketAddress) {
        AsyncProtoSocketChannel asyncProtoSocketChannel = new AsyncProtoSocketChannel(socketAddress);
        asyncProtoSocketChannel.setReadBufferSize(this.readBufferSize);
        asyncProtoSocketChannel.setWriteBufferSize(this.writeBufferSize);
        asyncProtoSocketChannel.setReadExecutor(this.readExecutor);
        asyncProtoSocketChannel.setWriteExecutor(this.writeExecutor);
        asyncProtoSocketChannel.setReadTimeoutMillis(this.readTimeoutMillis);
        asyncProtoSocketChannel.setWriteTimeoutMillis(this.writeTimeoutMillis);
        asyncProtoSocketChannel.setSocketChannel(asynchronousSocketChannel);
        asyncProtoSocketChannel.addDisconnectionHandler(socketAddress2 -> {
            LOGGER.info("Disconnected from " + String.valueOf(socketAddress2));
            this.socketChannels.remove(socketAddress2);
            this.disconnectionHandlers.forEach(disconnectionHandler -> {
                disconnectionHandler.onDisconnected(socketAddress2);
            });
        });
        asyncProtoSocketChannel.addMessageReceivedHandler((socketAddress3, message) -> {
            this.messageReceivedHandlers.forEach(messageReceivedHandler -> {
                messageReceivedHandler.onMessageReceived(socketAddress3, message);
            });
        });
        asyncProtoSocketChannel.addMessageSentHandler((socketAddress4, message2) -> {
            this.messageSentHandlers.forEach(messageSentHandler -> {
                messageSentHandler.onMessageSent(socketAddress4, message2);
            });
        });
        asyncProtoSocketChannel.addMessageSendFailureHandler((socketAddress5, message3, th) -> {
            this.messageSendFailureHandlers.forEach(messageSendFailureHandler -> {
                messageSendFailureHandler.onMessageSendFailure(socketAddress5, message3, th);
            });
        });
        asyncProtoSocketChannel.setSerializer(this.serializer);
        asyncProtoSocketChannel.init();
        return asyncProtoSocketChannel;
    }

    private SocketAddress getRemoteAddress(AsynchronousSocketChannel asynchronousSocketChannel) {
        try {
            return asynchronousSocketChannel.getRemoteAddress();
        } catch (IOException e) {
            throw new IllegalStateException("Unable to get Remote Address from socket channel", e);
        }
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    @PreDestroy
    public void stop() {
        this.socketChannels.values().forEach((v0) -> {
            v0.disconnect();
        });
        this.socketChannels.clear();
        try {
            this.serverSocketChannel.close();
        } catch (IOException e) {
            LOGGER.error("Unable to close server socket channel at port " + this.serverPort, e);
        }
        if (!this.acceptExecutor.isShutdown()) {
            this.acceptExecutor.shutdown();
        }
        if (!this.readExecutor.isShutdown()) {
            this.readExecutor.shutdown();
        }
        if (this.writeExecutor.isShutdown()) {
            return;
        }
        this.writeExecutor.shutdown();
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void sendMessage(SocketAddress socketAddress, Message message) {
        this.socketChannels.get(socketAddress).sendMessage(message);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void sendMessageToAll(Message message) {
        this.socketChannels.values().forEach(protoSocketChannel -> {
            protoSocketChannel.sendMessage(message);
        });
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public Collection<SocketAddress> getConnectedAddresses() {
        return Collections.unmodifiableCollection(this.socketChannels.keySet());
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public boolean isConnected(SocketAddress socketAddress) {
        return this.socketChannels.containsKey(socketAddress);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void addConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandlers.add(connectionHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void removeConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandlers.remove(connectionHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void addDisconnectionHandler(DisconnectionHandler disconnectionHandler) {
        this.disconnectionHandlers.add(disconnectionHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void removeDisconnectionHandler(DisconnectionHandler disconnectionHandler) {
        this.disconnectionHandlers.remove(disconnectionHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void addMessageReceivedHandler(MessageReceivedHandler messageReceivedHandler) {
        this.messageReceivedHandlers.add(messageReceivedHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void removeMessageReceivedHandler(MessageReceivedHandler messageReceivedHandler) {
        this.messageReceivedHandlers.remove(messageReceivedHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void addMessageSentHandler(MessageSentHandler messageSentHandler) {
        this.messageSentHandlers.add(messageSentHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void removeMessageSentHandler(MessageSentHandler messageSentHandler) {
        this.messageSentHandlers.remove(messageSentHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void addMessageSendFailureHandler(MessageSendFailureHandler messageSendFailureHandler) {
        this.messageSendFailureHandlers.add(messageSendFailureHandler);
    }

    @Override // com.github.quantranuk.protobuf.nio.ProtoServerSocketChannel
    public void removeMessageSendFailureHandler(MessageSendFailureHandler messageSendFailureHandler) {
        this.messageSendFailureHandlers.remove(messageSendFailureHandler);
    }

    public void setReadBufferSize(int i) {
        this.readBufferSize = i;
    }

    public void setWriteBufferSize(int i) {
        this.writeBufferSize = i;
    }

    public void setReadTimeoutMillis(long j) {
        this.readTimeoutMillis = j;
    }

    public void setWriteTimeoutMillis(long j) {
        this.writeTimeoutMillis = j;
    }

    public void setSerializer(ProtoSerializer protoSerializer) {
        this.serializer = protoSerializer;
    }
}
