package org.jetlang.remote.client;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetlang.channels.Channel;
import org.jetlang.channels.ChannelSubscription;
import org.jetlang.channels.MemoryChannel;
import org.jetlang.channels.Subscribable;
import org.jetlang.channels.Subscriber;
import org.jetlang.core.Callback;
import org.jetlang.core.Disposable;
import org.jetlang.core.DisposingExecutor;
import org.jetlang.fibers.Fiber;
import org.jetlang.remote.acceptor.MessageStreamWriter;
import org.jetlang.remote.client.CloseEvent;
import org.jetlang.remote.core.CloseableChannel;
import org.jetlang.remote.core.ErrorHandler;
import org.jetlang.remote.core.HeartbeatEvent;
import org.jetlang.remote.core.JetlangRemotingInputStream;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.ReadTimeoutEvent;
import org.jetlang.remote.core.Serializer;
import org.jetlang.remote.core.SocketMessageStreamWriter;
import org.jetlang.remote.core.TcpSocket;

/* loaded from: input_file:org/jetlang/remote/client/JetlangTcpClient.class */
public class JetlangTcpClient implements JetlangClient {
    private MessageStreamWriter socket;
    private final Fiber sendFiber;
    private final JetlangClientConfig config;
    private final Serializer ser;
    private final ErrorHandler errorHandler;
    private static final Charset charset = Charset.forName("US-ASCII");
    private final SocketConnector socketConnector;
    private Disposable pendingConnect;
    private Disposable hbSchedule;
    private final CloseableChannel.Group channelsToClose = new CloseableChannel.Group();
    private final Map<String, RemoteSubscription> remoteSubscriptions = new LinkedHashMap();
    private final Channel<ConnectEvent> Connected = channel();
    private final Channel<CloseEvent> Closed = channel();
    private final Channel<ReadTimeoutEvent> ReadTimeout = channel();
    private final Channel<DeadMessageEvent> DeadMessage = channel();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CountDownLatch logoutLatch = new CountDownLatch(1);
    private final Channel<HeartbeatEvent> Heartbeat = channel();
    private AtomicInteger reqId = new AtomicInteger();
    private final Map<Integer, Req> pendingRequests = Collections.synchronizedMap(new HashMap());
    private final SocketWriter socketWriter = new SocketWriter() { // from class: org.jetlang.remote.client.JetlangTcpClient.1
        @Override // org.jetlang.remote.client.SocketWriter
        public <T> boolean send(String str, T t) {
            if (JetlangTcpClient.this.socket == null) {
                JetlangTcpClient.this.DeadMessage.publish(new DeadMessageEvent(str, t));
                return false;
            }
            try {
                JetlangTcpClient.this.socket.write(str, t);
                return true;
            } catch (IOException e) {
                JetlangTcpClient.this.DeadMessage.publish(new DeadMessageEvent(str, t));
                JetlangTcpClient.this.handleDisconnect(new CloseEvent.WriteException(e));
                return false;
            }
        }
    };
    private final Runnable connect = new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.2
        @Override // java.lang.Runnable
        public void run() {
            try {
                JetlangTcpClient.this.handleConnect(JetlangTcpClient.this.socketConnector.connect());
            } catch (Exception e) {
                JetlangTcpClient.this.errorHandler.onException(e);
                JetlangTcpClient.this.socket = null;
            }
        }
    };
    private final Runnable hb = new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.3
        @Override // java.lang.Runnable
        public void run() {
            try {
                if (JetlangTcpClient.this.socket != null) {
                    JetlangTcpClient.this.socket.writeByteAsInt(1);
                }
            } catch (IOException e) {
                JetlangTcpClient.this.handleDisconnect(new CloseEvent.WriteException(e));
            }
        }
    };
    private final Runnable onReadTimeout = new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.4
        @Override // java.lang.Runnable
        public void run() {
            JetlangTcpClient.this.ReadTimeout.publish(new ReadTimeoutEvent());
        }
    };
    private final JetlangRemotingProtocol.Handler protocolHandler = new JetlangRemotingProtocol.Handler() { // from class: org.jetlang.remote.client.JetlangTcpClient.6
        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onMessage(String str, Object obj) {
            JetlangTcpClient.this.publishData(str, obj);
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onSubscriptionRequest(String str) {
            JetlangTcpClient.this.errorHandler.onException(new IOException("SubscriptionNotSupported: " + str));
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onRequest(int i, String str, Object obj) {
            JetlangTcpClient.this.errorHandler.onException(new IOException("RequestNotSupported: " + str + " val: " + obj));
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onUnsubscribeRequest(String str) {
            JetlangTcpClient.this.errorHandler.onException(new IOException("UnsubscribeNotSupported: " + str));
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onHandlerException(Exception exc) {
            JetlangTcpClient.this.errorHandler.onException(exc);
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onHb() {
            JetlangTcpClient.this.Heartbeat.publish(new HeartbeatEvent());
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onLogout() {
            JetlangTcpClient.this.logoutLatch.countDown();
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onUnknownMessage(int i) {
            JetlangTcpClient.this.errorHandler.onException(new IOException(i + " not supported"));
        }

        @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
        public void onRequestReply(int i, String str, Object obj) {
            JetlangTcpClient.this.publishReply(i, obj);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlang/remote/client/JetlangTcpClient$RemoteSubscription.class */
    public class RemoteSubscription<T> {
        private final CloseableChannel<T> channel;
        private final String topic;
        private boolean subscriptionSent = false;

        public RemoteSubscription(String str) {
            this.channel = JetlangTcpClient.this.channel();
            this.topic = str;
        }

        public Disposable subscribe(Subscribable<T> subscribable) {
            final Disposable subscribe = this.channel.subscribe(subscribable);
            JetlangTcpClient.this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.RemoteSubscription.1
                @Override // java.lang.Runnable
                public void run() {
                    if (RemoteSubscription.this.subscriptionSent) {
                        return;
                    }
                    RemoteSubscription.this.subscriptionSent = JetlangTcpClient.this.sendSubscription(RemoteSubscription.this.topic, 3);
                }
            });
            return new Disposable() { // from class: org.jetlang.remote.client.JetlangTcpClient.RemoteSubscription.2
                public void dispose() {
                    subscribe.dispose();
                    JetlangTcpClient.this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.RemoteSubscription.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            RemoteSubscription.this.unsubscribeIfNecessary();
                        }
                    });
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unsubscribeIfNecessary() {
            synchronized (JetlangTcpClient.this.remoteSubscriptions) {
                if (this.channel.subscriptionCount() == 0 && !this.channel.isClosed()) {
                    this.channel.close();
                    JetlangTcpClient.this.channelsToClose.remove(this.channel);
                    if (this.subscriptionSent) {
                        JetlangTcpClient.this.sendSubscription(this.topic, 5);
                    }
                    JetlangTcpClient.this.remoteSubscriptions.remove(this.topic);
                }
            }
        }

        public void publish(T t) {
            this.channel.publish(t);
        }

        public void onConnect() {
            this.subscriptionSent = JetlangTcpClient.this.sendSubscription(this.topic, 3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlang/remote/client/JetlangTcpClient$Req.class */
    public class Req<T> {
        final DisposingExecutor fiber;
        final Callback<T> cb;
        private final AtomicBoolean disposed;

        public Req(DisposingExecutor disposingExecutor, Callback<T> callback, AtomicBoolean atomicBoolean) {
            this.fiber = disposingExecutor;
            this.cb = callback;
            this.disposed = atomicBoolean;
        }

        public void onReply(final T t) {
            if (this.disposed.get()) {
                return;
            }
            this.fiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.Req.1
                @Override // java.lang.Runnable
                public void run() {
                    if (Req.this.disposed.compareAndSet(false, true)) {
                        Req.this.cb.onMessage(t);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> CloseableChannel<T> channel() {
        return this.channelsToClose.add(new MemoryChannel());
    }

    public JetlangTcpClient(SocketConnector socketConnector, Fiber fiber, JetlangClientConfig jetlangClientConfig, Serializer serializer, ErrorHandler errorHandler) {
        this.socketConnector = socketConnector;
        this.sendFiber = fiber;
        this.config = jetlangClientConfig;
        this.ser = serializer;
        this.errorHandler = errorHandler;
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public <T> Disposable subscribe(String str, Subscribable<T> subscribable) {
        RemoteSubscription remoteSubscription;
        Disposable subscribe;
        synchronized (this.remoteSubscriptions) {
            if (this.remoteSubscriptions.containsKey(str)) {
                remoteSubscription = this.remoteSubscriptions.get(str);
            } else {
                remoteSubscription = new RemoteSubscription(str);
                this.remoteSubscriptions.put(str, remoteSubscription);
            }
            subscribe = remoteSubscription.subscribe(subscribable);
        }
        return subscribe;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishData(String str, Object obj) {
        RemoteSubscription remoteSubscription;
        synchronized (this.remoteSubscriptions) {
            remoteSubscription = this.remoteSubscriptions.get(str);
        }
        if (remoteSubscription != null) {
            remoteSubscription.publish(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishReply(int i, Object obj) {
        Req remove = this.pendingRequests.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.onReply(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean sendSubscription(String str, int i) {
        if (this.socket == null) {
            return false;
        }
        try {
            this.socket.writeSubscription(i, str, charset);
            return true;
        } catch (IOException e) {
            handleDisconnect(new CloseEvent.WriteException(e));
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeIfNeeded(CloseEvent closeEvent) {
        if (this.socket != null) {
            this.socket.tryClose();
            this.socket = null;
            if (this.hbSchedule != null) {
                this.hbSchedule.dispose();
            }
            if (this.closed.get()) {
                this.Closed.publish(new CloseEvent.GracefulDisconnect());
            } else {
                this.Closed.publish(closeEvent);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConnect(Socket socket) throws IOException {
        this.pendingConnect.dispose();
        this.pendingConnect = null;
        this.socket = new SocketMessageStreamWriter(new TcpSocket(socket, this.errorHandler), charset, this.ser.getWriter());
        synchronized (this.remoteSubscriptions) {
            Iterator<RemoteSubscription> it = this.remoteSubscriptions.values().iterator();
            while (it.hasNext()) {
                it.next().onConnect();
            }
        }
        final InputStream inputStream = socket.getInputStream();
        new Thread(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.5
            @Override // java.lang.Runnable
            public void run() {
                JetlangRemotingInputStream jetlangRemotingInputStream = new JetlangRemotingInputStream(inputStream, new JetlangRemotingProtocol(JetlangTcpClient.this.protocolHandler, JetlangTcpClient.this.ser.getReader(), JetlangTcpClient.charset), JetlangTcpClient.this.onReadTimeout);
                try {
                    JetlangTcpClient.this.Connected.publish(new ConnectEvent());
                    do {
                    } while (jetlangRemotingInputStream.readFromStream());
                } catch (IOException e) {
                    JetlangTcpClient.this.handleReadExceptionOnSendFiber(e);
                }
            }
        }, getClass().getSimpleName()).start();
        if (this.config.getHeartbeatIntervalInMs() > 0) {
            this.hbSchedule = this.sendFiber.scheduleWithFixedDelay(this.hb, this.config.getHeartbeatIntervalInMs(), this.config.getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleReadExceptionOnSendFiber(final IOException iOException) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.7
            @Override // java.lang.Runnable
            public void run() {
                JetlangTcpClient.this.handleDisconnect(new CloseEvent.ReadException(iOException));
            }
        });
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public void start() {
        this.pendingConnect = this.sendFiber.scheduleWithFixedDelay(this.connect, this.config.getInitialConnectDelayInMs(), this.config.getReconnectDelayInMs(), TimeUnit.MILLISECONDS);
        this.sendFiber.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDisconnect(CloseEvent closeEvent) {
        closeIfNeeded(closeEvent);
        if (this.pendingConnect != null || this.closed.get() || this.config.getReconnectDelayInMs() <= 0) {
            return;
        }
        this.pendingConnect = this.sendFiber.scheduleWithFixedDelay(this.connect, this.config.getReconnectDelayInMs(), this.config.getReconnectDelayInMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public <T> Disposable subscribe(String str, DisposingExecutor disposingExecutor, Callback<T> callback) {
        return subscribe(str, new ChannelSubscription(disposingExecutor, callback));
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public LogoutResult close(final boolean z) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (!this.closed.compareAndSet(false, true)) {
            throw new RuntimeException("Already closed.");
        }
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.8
            @Override // java.lang.Runnable
            public void run() {
                if (JetlangTcpClient.this.socket != null && z) {
                    try {
                        JetlangTcpClient.this.socket.writeByteAsInt(2);
                        atomicBoolean.set(JetlangTcpClient.this.logoutLatch.await(JetlangTcpClient.this.config.getLogoutLatchTimeout(), JetlangTcpClient.this.config.getLogoutLatchTimeoutUnit()));
                    } catch (Exception e) {
                        JetlangTcpClient.this.errorHandler.onException(e);
                    }
                }
                if (JetlangTcpClient.this.pendingConnect != null) {
                    JetlangTcpClient.this.pendingConnect.dispose();
                }
                JetlangTcpClient.this.closeIfNeeded(new CloseEvent.GracefulDisconnect());
                countDownLatch.countDown();
                JetlangTcpClient.this.sendFiber.dispose();
                JetlangTcpClient.this.channelsToClose.closeAndClear();
            }
        });
        return new LogoutResult(atomicBoolean, countDownLatch);
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public <T> Disposable request(final String str, final Object obj, final DisposingExecutor disposingExecutor, final Callback<T> callback, final Callback<TimeoutControls> callback2, int i, TimeUnit timeUnit) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final int incrementAndGet = this.reqId.incrementAndGet();
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.9
            @Override // java.lang.Runnable
            public void run() {
                if (atomicBoolean.get() || JetlangTcpClient.this.socket == null) {
                    return;
                }
                JetlangTcpClient.this.pendingRequests.put(Integer.valueOf(incrementAndGet), new Req(disposingExecutor, callback, atomicBoolean));
                try {
                    JetlangTcpClient.this.socket.writeRequest(incrementAndGet, str, obj);
                } catch (IOException e) {
                    JetlangTcpClient.this.pendingRequests.remove(Integer.valueOf(incrementAndGet));
                    JetlangTcpClient.this.handleDisconnect(new CloseEvent.WriteException(e));
                }
            }
        });
        if (i <= 0 || callback == null) {
            return new Disposable() { // from class: org.jetlang.remote.client.JetlangTcpClient.12
                public void dispose() {
                    atomicBoolean.set(true);
                }
            };
        }
        final Disposable schedule = this.sendFiber.schedule(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.10
            @Override // java.lang.Runnable
            public void run() {
                if (atomicBoolean.get()) {
                    return;
                }
                callback2.onMessage(new TimeoutControls() { // from class: org.jetlang.remote.client.JetlangTcpClient.10.1
                    @Override // org.jetlang.remote.client.TimeoutControls
                    public void cancelRequest() {
                        atomicBoolean.set(true);
                        JetlangTcpClient.this.pendingRequests.remove(Integer.valueOf(incrementAndGet));
                    }
                });
            }
        }, i, timeUnit);
        return new Disposable() { // from class: org.jetlang.remote.client.JetlangTcpClient.11
            public void dispose() {
                atomicBoolean.set(true);
                schedule.dispose();
            }
        };
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public Subscriber<CloseEvent> getCloseChannel() {
        return this.Closed;
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public Subscriber<ReadTimeoutEvent> getReadTimeoutChannel() {
        return this.ReadTimeout;
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public Subscriber<ConnectEvent> getConnectChannel() {
        return this.Connected;
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public Subscriber<DeadMessageEvent> getDeadMessageChannel() {
        return this.DeadMessage;
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public <T> void publish(String str, T t) {
        publish(str, t, null);
    }

    public <T> void publish(final String str, final T t, final Runnable runnable) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.13
            @Override // java.lang.Runnable
            public void run() {
                if (!JetlangTcpClient.this.socketWriter.send(str, t) || runnable == null) {
                    return;
                }
                runnable.run();
            }
        });
    }

    @Override // org.jetlang.remote.client.JetlangClient
    public void execOnSendThread(final Callback<SocketWriter> callback) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.client.JetlangTcpClient.14
            @Override // java.lang.Runnable
            public void run() {
                callback.onMessage(JetlangTcpClient.this.socketWriter);
            }
        });
    }
}
