package dev.getelements.elements.remote.jeromq;

import dev.getelements.elements.rt.AsyncConnection;
import dev.getelements.elements.rt.AsyncConnectionGroup;
import dev.getelements.elements.rt.AsyncConnectionPool;
import dev.getelements.elements.rt.AsyncConnectionService;
import dev.getelements.elements.rt.PayloadReader;
import dev.getelements.elements.rt.PayloadWriter;
import dev.getelements.elements.rt.exception.InternalException;
import dev.getelements.elements.rt.remote.InstanceConnectionService;
import dev.getelements.elements.rt.remote.LocalInvocationDispatcher;
import dev.getelements.elements.rt.remote.Node;
import dev.getelements.elements.rt.remote.NodeLifecycle;
import dev.getelements.elements.rt.remote.NodeState;
import dev.getelements.elements.sdk.cluster.id.NodeId;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:dev/getelements/elements/remote/jeromq/JeroMQNode.class */
public class JeroMQNode implements Node {
    private static final Logger staticLogger = LoggerFactory.getLogger(JeroMQNode.class);
    private static final String OUTBOUND_ADDR_FORMAT = "inproc://node/%s/out";
    public static final String JEROMQ_NODE_MIN_CONNECTIONS = "dev.getelements.elements.remote.jeromq.node.min.connections";
    public static final String JEROMQ_NODE_MAX_CONNECTIONS = "dev.getelements.elements.remote.jeromq.node.max.connections";
    private final AtomicReference<NodeState> state = new AtomicReference<>(NodeState.READY);
    private final AtomicReference<NodeContext> context = new AtomicReference<>();
    private String name;
    private NodeId nodeId;
    private int minConnections;
    private int maxConnections;
    private LocalInvocationDispatcher invocationDispatcher;
    private PayloadReader payloadReader;
    private PayloadWriter payloadWriter;
    private NodeLifecycle nodeLifecycle;
    private AsyncConnectionService<ZContext, ZMQ.Socket> asyncConnectionService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/getelements/elements/remote/jeromq/JeroMQNode$NodeContext.class */
    public class NodeContext {
        private final Logger logger;
        private AsyncConnection<ZContext, ZMQ.Socket> frontendConnection;
        private AsyncConnectionGroup<ZContext, ZMQ.Socket> mainConnectionGroup;
        private AsyncConnectionPool<ZContext, ZMQ.Socket> outboundConnectionPool;
        private final AtomicInteger dispatcherCount = new AtomicInteger();
        private final ExecutorService dispatchExecutorService = Executors.newCachedThreadPool(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName(String.format("%s %s.in #%d", getClass().getSimpleName(), JeroMQNode.this.getName(), Integer.valueOf(this.dispatcherCount.incrementAndGet())));
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                this.logger.error("Fatal Error: {}", thread2, th);
            });
            return thread;
        });

        private NodeContext() {
            this.logger = JeroMQNode.this.loggerForNode();
        }

        public void start(InstanceConnectionService.InstanceBinding instanceBinding) {
            CountDownLatch countDownLatch = new CountDownLatch(3);
            this.outboundConnectionPool = JeroMQNode.this.getAsyncConnectionService().allocatePool("JeroMQNode Outbound", JeroMQNode.this.getMinConnections(), JeroMQNode.this.getMaxConnections(), zContext -> {
                ZMQ.Socket createSocket = zContext.createSocket(SocketType.PUSH);
                createSocket.connect(JeroMQNode.this.getOutboundAddr());
                return createSocket;
            });
            JeroMQNode.this.getAsyncConnectionService().group(String.format("%s %s", JeroMQNode.class.getSimpleName(), JeroMQNode.this.name)).connection(zContext2 -> {
                ZMQ.Socket createSocket = zContext2.createSocket(SocketType.ROUTER);
                createSocket.bind(instanceBinding.getBindAddress());
                return createSocket;
            }, asyncConnection -> {
                this.frontendConnection = asyncConnection;
                asyncConnection.setEvents(new AsyncConnection.Event[]{AsyncConnection.Event.READ, AsyncConnection.Event.ERROR});
                asyncConnection.onRead(this::onFrontendRead);
                asyncConnection.onError(this::onFrontendError);
                countDownLatch.countDown();
            }).connection(zContext3 -> {
                ZMQ.Socket createSocket = zContext3.createSocket(SocketType.PULL);
                createSocket.bind(JeroMQNode.this.getOutboundAddr());
                return createSocket;
            }, asyncConnection2 -> {
                asyncConnection2.setEvents(new AsyncConnection.Event[]{AsyncConnection.Event.READ, AsyncConnection.Event.ERROR});
                asyncConnection2.onRead(this::onBackendRead);
                asyncConnection2.onError(this::onBackendError);
                countDownLatch.countDown();
            }).build(asyncConnectionGroup -> {
                this.mainConnectionGroup = asyncConnectionGroup;
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new InternalException(e);
            }
        }

        private void onFrontendRead(AsyncConnection<ZContext, ZMQ.Socket> asyncConnection) {
            ZMsg recvMsg = ZMsg.recvMsg((ZMQ.Socket) asyncConnection.socket());
            this.dispatchExecutorService.submit(() -> {
                dispatch(recvMsg);
            });
        }

        private void onFrontendError(AsyncConnection<ZContext, ZMQ.Socket> asyncConnection) {
            JeroMQNode.this.state.set(NodeState.UNHEALTHY);
            this.logger.error("Frontend Connection Error {} - errno {}", asyncConnection, Integer.valueOf(((ZMQ.Socket) asyncConnection.socket()).errno()));
        }

        private void onBackendRead(AsyncConnection<ZContext, ZMQ.Socket> asyncConnection) {
            ZMsg.recvMsg((ZMQ.Socket) asyncConnection.socket()).send((ZMQ.Socket) this.frontendConnection.socket());
        }

        private void onBackendError(AsyncConnection<ZContext, ZMQ.Socket> asyncConnection) {
            JeroMQNode.this.state.set(NodeState.UNHEALTHY);
            this.logger.error("Backend Connection Error {} - errno {}", asyncConnection, Integer.valueOf(((ZMQ.Socket) asyncConnection.socket()).errno()));
        }

        public void stop() {
            this.mainConnectionGroup.close();
            this.outboundConnectionPool.close();
            this.dispatchExecutorService.shutdownNow();
            try {
                if (!this.dispatchExecutorService.awaitTermination(10L, TimeUnit.MINUTES)) {
                    this.logger.error("Terminating dispatchers timed out.");
                }
            } catch (InterruptedException e) {
                this.logger.error("Interrupted while shutting down Node Thread Pool", e);
            }
        }

        private void dispatch(ZMsg zMsg) {
            new JeroMQNodeInvocation(zMsg, JeroMQNode.this.getInvocationDispatcher(), JeroMQNode.this.getPayloadReader(), JeroMQNode.this.getPayloadWriter(), this.outboundConnectionPool).dispatch();
        }
    }

    public String getName() {
        return this.name;
    }

    public NodeId getNodeId() {
        return this.nodeId;
    }

    public NodeState getState() {
        return this.state.get();
    }

    public String getOutboundAddr() {
        return String.format(OUTBOUND_ADDR_FORMAT, getNodeId().asString());
    }

    public Node.Startup beginStartup() {
        final NodeContext nodeContext = new NodeContext();
        nodeContext.logger.info("Beginning startup.");
        if (this.state.compareAndSet(NodeState.READY, NodeState.STARTING) && this.context.compareAndSet(null, nodeContext)) {
            return new Node.Startup() { // from class: dev.getelements.elements.remote.jeromq.JeroMQNode.1
                public Node getNode() {
                    return JeroMQNode.this;
                }

                public void preStart() {
                    try {
                        check();
                        nodeContext.logger.info("Issuing pre-start command.");
                        JeroMQNode.this.getNodeLifecycle().nodePreStart(getNode());
                    } catch (Exception e) {
                        JeroMQNode.this.state.set(NodeState.UNHEALTHY);
                        nodeContext.logger.error("Caught excpetion issuing pre-start command.", e);
                        throw e;
                    }
                }

                public void start(InstanceConnectionService.InstanceBinding instanceBinding) {
                    try {
                        check();
                        nodeContext.logger.info("Issuing start command with binding {}.", instanceBinding);
                        nodeContext.start(instanceBinding);
                        JeroMQNode.this.state.set(NodeState.STARTED);
                    } catch (Exception e) {
                        JeroMQNode.this.state.set(NodeState.UNHEALTHY);
                        nodeContext.logger.error("Caught exception issuing start command.", e);
                        throw e;
                    }
                }

                public void postStart() {
                    try {
                        check();
                        nodeContext.logger.info("Issuing post-start command for node {}", JeroMQNode.this.nodeId);
                        JeroMQNode.this.getNodeLifecycle().nodePostStart(getNode());
                        JeroMQNode.this.state.set(NodeState.HEALTHY);
                    } catch (Exception e) {
                        JeroMQNode.this.state.set(NodeState.UNHEALTHY);
                        nodeContext.logger.error("Caught exception issuing post-start command.  Terminating node.", e);
                        throw e;
                    }
                }

                public void cancel() {
                    try {
                        nodeContext.stop();
                    } catch (Exception e) {
                        nodeContext.logger.error("Error Canceling startup.", e);
                    }
                    if (!JeroMQNode.this.context.compareAndSet(nodeContext, null)) {
                        nodeContext.logger.error("Inconsistent state.  Startup does not reflect current state.");
                    }
                    JeroMQNode.this.state.set(NodeState.READY);
                }

                private void check() {
                    if (JeroMQNode.this.context.get() != nodeContext) {
                        throw new IllegalStateException("Incorrect/mismatched startup routine.");
                    }
                }
            };
        }
        throw new IllegalStateException("Already started.");
    }

    public Node.Shutdown beginShutdown() {
        final NodeContext andSet = this.context.getAndSet(null);
        final Logger logger = andSet == null ? staticLogger : andSet.logger;
        return new Node.Shutdown() { // from class: dev.getelements.elements.remote.jeromq.JeroMQNode.2
            public void preStop() {
                try {
                    logger.info("Issuing NodeLifecycle pre-stop command.");
                    JeroMQNode.this.getNodeLifecycle().nodePreStop(JeroMQNode.this);
                    JeroMQNode.this.state.set(NodeState.STOPPING);
                } catch (Exception e) {
                    logger.error("Caught exception issuing pre-stop command.", e);
                }
            }

            public void stop() {
                try {
                    if (andSet != null) {
                        andSet.stop();
                    }
                    logger.info("Shutdown.  Issuing NodeLifecycle post-stop command.");
                    JeroMQNode.this.state.set(NodeState.STOPPED);
                } catch (Exception e) {
                    logger.error("Caught exception issuing stop command.", e);
                }
            }

            public void postStop() {
                try {
                    logger.info("Shutdown.  Issued NodeLifecycle stop command.");
                    JeroMQNode.this.getNodeLifecycle().nodePostStop(JeroMQNode.this);
                    JeroMQNode.this.state.set(NodeState.READY);
                } catch (Exception e) {
                    logger.error("Caught excpetion issuing post-stop command.", e);
                }
            }
        };
    }

    private Logger loggerForNode() {
        String name = JeroMQNode.class.getName();
        return getName() != null ? LoggerFactory.getLogger(String.format("%s.%s", name, getName())) : getNodeId() != null ? LoggerFactory.getLogger(String.format("%s.%s", name, getNodeId().asString())) : staticLogger;
    }

    public LocalInvocationDispatcher getInvocationDispatcher() {
        return this.invocationDispatcher;
    }

    @Inject
    public void setInvocationDispatcher(LocalInvocationDispatcher localInvocationDispatcher) {
        this.invocationDispatcher = localInvocationDispatcher;
    }

    public PayloadReader getPayloadReader() {
        return this.payloadReader;
    }

    @Inject
    public void setPayloadReader(PayloadReader payloadReader) {
        this.payloadReader = payloadReader;
    }

    public PayloadWriter getPayloadWriter() {
        return this.payloadWriter;
    }

    @Inject
    public void setPayloadWriter(PayloadWriter payloadWriter) {
        this.payloadWriter = payloadWriter;
    }

    public AsyncConnectionService<ZContext, ZMQ.Socket> getAsyncConnectionService() {
        return this.asyncConnectionService;
    }

    @Inject
    public void setAsyncConnectionService(AsyncConnectionService<ZContext, ZMQ.Socket> asyncConnectionService) {
        this.asyncConnectionService = asyncConnectionService;
    }

    @Inject
    public void setNodeId(NodeId nodeId) {
        this.nodeId = nodeId;
    }

    @Inject
    public void setName(@Named("dev.getelements.elements.rt.node.name") String str) {
        this.name = str;
    }

    public int getMinConnections() {
        return this.minConnections;
    }

    @Inject
    public void setMinConnections(@Named("dev.getelements.elements.remote.jeromq.node.min.connections") int i) {
        this.minConnections = i;
    }

    public int getMaxConnections() {
        return this.maxConnections;
    }

    @Inject
    public void setMaxConnections(@Named("dev.getelements.elements.remote.jeromq.node.max.connections") int i) {
        this.maxConnections = i;
    }

    public NodeLifecycle getNodeLifecycle() {
        return this.nodeLifecycle;
    }

    @Inject
    public void setNodeLifecycle(NodeLifecycle nodeLifecycle) {
        this.nodeLifecycle = nodeLifecycle;
    }
}
