package sk.neuromancer.protobuf.nio.impl;

import com.google.protobuf.GeneratedMessage;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sk.neuromancer.protobuf.nio.ProtoSocketChannel;
import sk.neuromancer.protobuf.nio.handlers.ConnectionHandler;
import sk.neuromancer.protobuf.nio.handlers.DisconnectionHandler;
import sk.neuromancer.protobuf.nio.handlers.MessageReceivedHandler;
import sk.neuromancer.protobuf.nio.handlers.MessageSendFailureHandler;
import sk.neuromancer.protobuf.nio.handlers.MessageSentHandler;
import sk.neuromancer.protobuf.nio.utils.DefaultSetting;
import sk.neuromancer.protobuf.nio.utils.NamedThreadFactory;

/* loaded from: input_file:sk/neuromancer/protobuf/nio/impl/AsyncProtoSocketChannel.class */
public class AsyncProtoSocketChannel implements ProtoSocketChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProtoSocketChannel.class);
    private final SocketAddress socketAddress;
    private AsynchronousSocketChannel socketChannel;
    private SocketChannelReader reader;
    private SocketChannelWriter writer;
    private ExecutorService readExecutor;
    private ExecutorService writeExecutor;
    private AsynchronousChannelGroup channelGroup;
    private final List<ConnectionHandler> connectionHandlers = new CopyOnWriteArrayList();
    private final List<DisconnectionHandler> disconnectionHandlers = new CopyOnWriteArrayList();
    private final List<MessageReceivedHandler> messageReceivedHandlers = new CopyOnWriteArrayList();
    private final List<MessageSentHandler> messageSentHandlers = new CopyOnWriteArrayList();
    private final List<MessageSendFailureHandler> messageSendFailureHandlers = new CopyOnWriteArrayList();
    private int readBufferSize = 8192;
    private int writeBufferSize = 8192;
    private int maxMessageWriteQueueSize = DefaultSetting.MAX_WRITE_MESSAGE_QUEUE_SIZE;
    private long readTimeoutMillis = 0;
    private long writeTimeoutMillis = 10000;
    private boolean isInitialized = false;
    private boolean isShuttingDown = false;
    private boolean isInjectedReadExecutor = false;
    private boolean isInjectedWriteExecutor = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:sk/neuromancer/protobuf/nio/impl/AsyncProtoSocketChannel$MessageReadCompletionHandler.class */
    public class MessageReadCompletionHandler implements CompletionHandler<Long, GeneratedMessage> {
        private MessageReadCompletionHandler() {
        }

        @Override // java.nio.channels.CompletionHandler
        public void completed(Long l, GeneratedMessage generatedMessage) {
            AsyncProtoSocketChannel.this.messageReceivedHandlers.forEach(messageReceivedHandler -> {
                messageReceivedHandler.onMessageReceived(AsyncProtoSocketChannel.this.socketAddress, generatedMessage);
            });
        }

        @Override // java.nio.channels.CompletionHandler
        public void failed(Throwable th, GeneratedMessage generatedMessage) {
            if (AsyncProtoSocketChannel.this.isShuttingDown) {
                return;
            }
            AsyncProtoSocketChannel.LOGGER.debug("Unable to read from " + String.valueOf(AsyncProtoSocketChannel.this.socketAddress), th);
            AsyncProtoSocketChannel.this.disconnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:sk/neuromancer/protobuf/nio/impl/AsyncProtoSocketChannel$MessageWriteCompletionHandler.class */
    public class MessageWriteCompletionHandler implements CompletionHandler<Long, GeneratedMessage> {
        private MessageWriteCompletionHandler() {
        }

        @Override // java.nio.channels.CompletionHandler
        public void completed(Long l, GeneratedMessage generatedMessage) {
            AsyncProtoSocketChannel.this.messageSentHandlers.forEach(messageSentHandler -> {
                messageSentHandler.onMessageSent(AsyncProtoSocketChannel.this.socketAddress, generatedMessage);
            });
        }

        @Override // java.nio.channels.CompletionHandler
        public void failed(Throwable th, GeneratedMessage generatedMessage) {
            AsyncProtoSocketChannel.this.messageSendFailureHandlers.forEach(messageSendFailureHandler -> {
                messageSendFailureHandler.onMessageSendFailure(AsyncProtoSocketChannel.this.socketAddress, generatedMessage, th);
            });
        }
    }

    public AsyncProtoSocketChannel(SocketAddress socketAddress) {
        this.socketAddress = socketAddress;
    }

    public AsyncProtoSocketChannel(String str, int i) {
        this.socketAddress = new InetSocketAddress(str, i);
    }

    @PostConstruct
    public void init() {
        if (this.isInitialized) {
            return;
        }
        this.isInitialized = true;
        if (this.readExecutor == null) {
            this.readExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(AsyncProtoSocketChannel.class.getSimpleName() + "-Reader"));
        }
        if (this.writeExecutor == null) {
            this.writeExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(AsyncProtoSocketChannel.class.getSimpleName() + "-Writer"));
        }
        if (this.socketChannel == null) {
            try {
                this.channelGroup = AsynchronousChannelGroup.withThreadPool(this.readExecutor);
                this.socketChannel = AsynchronousSocketChannel.open(this.channelGroup);
            } catch (IOException e) {
                throw new IllegalStateException("Unable to open socket channel", e);
            }
        }
        this.reader = new SocketChannelReader(this.socketChannel, this.socketAddress, this.readTimeoutMillis, this.readBufferSize, this.readExecutor, new MessageReadCompletionHandler());
        this.writer = new SocketChannelWriter(this.socketChannel, this.writeTimeoutMillis, this.writeBufferSize, this.maxMessageWriteQueueSize, this.writeExecutor, new MessageWriteCompletionHandler());
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void connect() {
        try {
            this.socketChannel.connect(this.socketAddress).get();
            LOGGER.debug("Connected to " + String.valueOf(this.socketAddress));
            this.connectionHandlers.forEach(connectionHandler -> {
                connectionHandler.onConnected(this.socketAddress);
            });
            this.reader.start();
        } catch (InterruptedException e) {
            LOGGER.debug("Interrupted while connecting to " + String.valueOf(this.socketAddress), e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOGGER.error("An error has occurred while trying connect to " + String.valueOf(this.socketAddress), e2);
            disconnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startReading() {
        this.reader.start();
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    @PreDestroy
    public void disconnect() {
        this.isShuttingDown = true;
        if (this.reader != null) {
            this.reader.stop();
        }
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            } catch (IOException e) {
                LOGGER.error("Unable to close socket channel to " + String.valueOf(this.socketAddress), e);
            }
        }
        LOGGER.debug("Disconnected from " + String.valueOf(this.socketAddress));
        this.disconnectionHandlers.forEach(disconnectionHandler -> {
            disconnectionHandler.onDisconnected(this.socketAddress);
        });
        if (!this.isInjectedReadExecutor) {
            this.readExecutor.shutdown();
        }
        if (this.isInjectedWriteExecutor) {
            return;
        }
        this.writeExecutor.shutdown();
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void sendMessage(GeneratedMessage generatedMessage) {
        if (!this.socketChannel.isOpen()) {
            throw new IllegalStateException("Socket channel " + String.valueOf(this.socketAddress) + " is closed");
        }
        this.writer.addToWriteQueue(generatedMessage);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void addConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandlers.add(connectionHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void removeConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandlers.remove(connectionHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void addDisconnectionHandler(DisconnectionHandler disconnectionHandler) {
        this.disconnectionHandlers.add(disconnectionHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void removeDisconnectionHandler(DisconnectionHandler disconnectionHandler) {
        this.disconnectionHandlers.remove(disconnectionHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void addMessageReceivedHandler(MessageReceivedHandler messageReceivedHandler) {
        this.messageReceivedHandlers.add(messageReceivedHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void removeMessageReceivedHandler(MessageReceivedHandler messageReceivedHandler) {
        this.messageReceivedHandlers.remove(messageReceivedHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void addMessageSentHandler(MessageSentHandler messageSentHandler) {
        this.messageSentHandlers.add(messageSentHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void removeMessageSentHandler(MessageSentHandler messageSentHandler) {
        this.messageSentHandlers.remove(messageSentHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void addMessageSendFailureHandler(MessageSendFailureHandler messageSendFailureHandler) {
        this.messageSendFailureHandlers.add(messageSendFailureHandler);
    }

    @Override // sk.neuromancer.protobuf.nio.ProtoSocketChannel
    public void removeMessageSendFailureHandler(MessageSendFailureHandler messageSendFailureHandler) {
        this.messageSendFailureHandlers.remove(messageSendFailureHandler);
    }

    public void setReadBufferSize(int i) {
        this.readBufferSize = i;
    }

    public void setWriteBufferSize(int i) {
        this.writeBufferSize = i;
    }

    public void setMaxMessageWriteQueueSize(int i) {
        this.maxMessageWriteQueueSize = i;
    }

    public void setReadTimeoutMillis(long j) {
        this.readTimeoutMillis = j;
    }

    public void setWriteTimeoutMillis(long j) {
        this.writeTimeoutMillis = j;
    }

    public void setSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel) {
        this.socketChannel = asynchronousSocketChannel;
    }

    public void setReadExecutor(ExecutorService executorService) {
        validateSingleThreadedPool(executorService);
        this.readExecutor = executorService;
        this.isInjectedReadExecutor = executorService != null;
    }

    public void setWriteExecutor(ExecutorService executorService) {
        validateSingleThreadedPool(executorService);
        this.writeExecutor = executorService;
        this.isInjectedWriteExecutor = executorService != null;
    }

    private static void validateSingleThreadedPool(ExecutorService executorService) {
        if ((executorService instanceof ThreadPoolExecutor) && ((ThreadPoolExecutor) executorService).getMaximumPoolSize() != 1) {
            throw new IllegalStateException("This class can only support single-threaded pool");
        }
    }
}
