package org.jetlang.remote.acceptor;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetlang.fibers.Fiber;
import org.jetlang.fibers.ThreadFiber;
import org.jetlang.remote.acceptor.Acceptor;
import org.jetlang.remote.core.ErrorHandler;
import org.jetlang.remote.core.JetlangRemotingInputStream;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.ReadTimeoutEvent;
import org.jetlang.remote.core.Serializer;
import org.jetlang.remote.core.SerializerFactory;
import org.jetlang.remote.core.SocketMessageStreamWriter;
import org.jetlang.remote.core.TcpSocket;

/* loaded from: input_file:org/jetlang/remote/acceptor/JetlangClientHandler.class */
public class JetlangClientHandler implements Acceptor.ClientHandler, ClientPublisher {
    private final SerializerAdapter ser;
    private final NewSessionHandler channels;
    private final Executor exec;
    private final JetlangSessionConfig config;
    private final FiberFactory fiberFactory;
    private final ErrorHandler errorHandler;
    private final AtomicBoolean running;
    private final Collection<ClientTcpSocket> clients;
    private final BufferedSerializer globalBuffer;
    private final Fiber globalSendFiber;

    /* loaded from: input_file:org/jetlang/remote/acceptor/JetlangClientHandler$FiberFactory.class */
    public interface FiberFactory {

        /* loaded from: input_file:org/jetlang/remote/acceptor/JetlangClientHandler$FiberFactory$ThreadFiberFactory.class */
        public static class ThreadFiberFactory implements FiberFactory {
            @Override // org.jetlang.remote.acceptor.JetlangClientHandler.FiberFactory
            public Fiber createGlobalSendFiber() {
                return new ThreadFiber();
            }

            @Override // org.jetlang.remote.acceptor.JetlangClientHandler.FiberFactory
            public Fiber createSendFiber(Socket socket) {
                return new ThreadFiber();
            }
        }

        Fiber createGlobalSendFiber();

        Fiber createSendFiber(Socket socket);
    }

    /* loaded from: input_file:org/jetlang/remote/acceptor/JetlangClientHandler$ReadTimeoutHandler.class */
    private static class ReadTimeoutHandler implements Runnable {
        private final JetlangStreamSession session;
        public boolean userLoggedOut;

        public ReadTimeoutHandler(JetlangStreamSession jetlangStreamSession) {
            this.session = jetlangStreamSession;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.userLoggedOut) {
                return;
            }
            this.session.onReadTimeout(new ReadTimeoutEvent());
        }
    }

    public JetlangClientHandler(SerializerFactory serializerFactory, NewSessionHandler newSessionHandler, Executor executor, JetlangSessionConfig jetlangSessionConfig, FiberFactory fiberFactory, ErrorHandler errorHandler) {
        this(new SerializerAdapter(serializerFactory), newSessionHandler, executor, jetlangSessionConfig, fiberFactory, errorHandler);
    }

    public JetlangClientHandler(SerializerAdapter serializerAdapter, NewSessionHandler newSessionHandler, Executor executor, JetlangSessionConfig jetlangSessionConfig, FiberFactory fiberFactory, ErrorHandler errorHandler) {
        this.running = new AtomicBoolean(true);
        this.clients = new HashSet();
        this.ser = serializerAdapter;
        this.channels = newSessionHandler;
        this.exec = executor;
        this.config = jetlangSessionConfig;
        this.fiberFactory = fiberFactory;
        this.errorHandler = errorHandler;
        this.globalSendFiber = fiberFactory.createGlobalSendFiber();
        this.globalSendFiber.start();
        this.globalBuffer = serializerAdapter.createBuffered();
    }

    @Override // org.jetlang.remote.acceptor.Acceptor.ClientHandler
    public void startClient(Socket socket) {
        ClientTcpSocket clientTcpSocket = new ClientTcpSocket(new TcpSocket(socket, this.errorHandler));
        synchronized (this.clients) {
            if (!this.running.get()) {
                stopAndRemove(clientTcpSocket);
                return;
            }
            this.clients.add(clientTcpSocket);
            try {
                configureClientSocketAfterAccept(socket);
                this.exec.execute(createRunnable(clientTcpSocket));
            } catch (IOException e) {
                this.errorHandler.onException(e);
                stopAndRemove(clientTcpSocket);
            }
        }
    }

    @Override // org.jetlang.remote.acceptor.Acceptor.ClientHandler
    public void close() {
        this.globalSendFiber.dispose();
        synchronized (this.clients) {
            if (this.running.compareAndSet(true, false)) {
                Iterator<ClientTcpSocket> it = this.clients.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.clients.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopAndRemove(ClientTcpSocket clientTcpSocket) {
        clientTcpSocket.close();
        synchronized (this.clients) {
            this.clients.remove(clientTcpSocket);
        }
    }

    public int clientCount() {
        int size;
        synchronized (this.clients) {
            size = this.clients.size();
        }
        return size;
    }

    @Override // org.jetlang.remote.acceptor.ClientPublisher
    public void publishToAllSubscribedClients(final String str, final Object obj) {
        this.globalSendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangClientHandler.1
            @Override // java.lang.Runnable
            public void run() {
                JetlangClientHandler.this.enqueueToAllSubscribedClients(str, JetlangClientHandler.this.globalBuffer.createArray(str, obj));
            }
        });
    }

    public void enqueueToAllSubscribedClients(String str, byte[] bArr) {
        synchronized (this.clients) {
            Iterator<ClientTcpSocket> it = this.clients.iterator();
            while (it.hasNext()) {
                it.next().publishIfSubscribed(str, bArr);
            }
        }
    }

    private Runnable createRunnable(final ClientTcpSocket clientTcpSocket) throws IOException {
        final TcpSocket socket = clientTcpSocket.getSocket();
        final Fiber createSendFiber = this.fiberFactory.createSendFiber(socket.getSocket());
        final Serializer createForSocket = this.ser.createForSocket(socket);
        final JetlangStreamSession jetlangStreamSession = new JetlangStreamSession(socket.getRemoteSocketAddress(), new SocketMessageStreamWriter(socket, this.ser.getCharset(), createForSocket.getWriter()), createSendFiber, this.errorHandler);
        return new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangClientHandler.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        ReadTimeoutHandler readTimeoutHandler = new ReadTimeoutHandler(jetlangStreamSession);
                        clientTcpSocket.setSession(jetlangStreamSession);
                        JetlangClientHandler.this.channels.onNewSession(JetlangClientHandler.this, jetlangStreamSession);
                        jetlangStreamSession.startHeartbeat(JetlangClientHandler.this.config.getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS);
                        createSendFiber.start();
                        do {
                        } while (new JetlangRemotingInputStream(socket.getInputStream(), new JetlangRemotingProtocol(jetlangStreamSession, createForSocket.getReader(), JetlangClientHandler.this.ser.getCharset()), readTimeoutHandler).readFromStream());
                        createSendFiber.dispose();
                        JetlangClientHandler.this.stopAndRemove(clientTcpSocket);
                        jetlangStreamSession.onClose(new SessionCloseEvent());
                    } catch (IOException e) {
                        createSendFiber.dispose();
                        JetlangClientHandler.this.stopAndRemove(clientTcpSocket);
                        jetlangStreamSession.onClose(new SessionCloseEvent());
                    } catch (Exception e2) {
                        JetlangClientHandler.this.errorHandler.onException(e2);
                        createSendFiber.dispose();
                        JetlangClientHandler.this.stopAndRemove(clientTcpSocket);
                        jetlangStreamSession.onClose(new SessionCloseEvent());
                    }
                } catch (Throwable th) {
                    createSendFiber.dispose();
                    JetlangClientHandler.this.stopAndRemove(clientTcpSocket);
                    jetlangStreamSession.onClose(new SessionCloseEvent());
                    throw th;
                }
            }
        };
    }

    private void configureClientSocketAfterAccept(Socket socket) throws SocketException {
        socket.setTcpNoDelay(this.config.getTcpNoDelay());
        if (this.config.getReceiveBufferSize() > 0) {
            socket.setReceiveBufferSize(this.config.getReceiveBufferSize());
        }
        if (this.config.getSendBufferSize() > 0) {
            socket.setSendBufferSize(this.config.getSendBufferSize());
        }
        if (this.config.getReadTimeoutInMs() > 0) {
            socket.setSoTimeout(this.config.getReadTimeoutInMs());
        }
    }
}
