package dev.getelements.elements.rt.remote.jeromq;

import dev.getelements.elements.sdk.cluster.id.InstanceId;
import dev.getelements.elements.sdk.cluster.id.NodeId;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQDemultiplexRouter.class */
public class JeroMQDemultiplexRouter {
    private final InstanceId instanceId;
    private final Logger logger;
    private final ZContext zContext;
    private final ZMQ.Poller poller;
    private final ZMQ.PollItem frontend;
    private final AtomicLong socketUniqueIds = new AtomicLong();
    private final Map<NodeId, ZMQ.PollItem> backends = new LinkedHashMap();
    private final Stats stats = new Stats();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQDemultiplexRouter$Stats.class */
    public class Stats {
        private final SortedMap<NodeId, String> routes = new TreeMap();
        private final JeroMQDebugCounter errorCounter = new JeroMQDebugCounter();
        private final Runnable log;

        private Stats() {
            this.log = JeroMQDemultiplexRouter.this.logger.isDebugEnabled() ? this::doLog : () -> {
            };
        }

        private void doLog() {
            JeroMQDemultiplexRouter.this.logger.debug("Demultiplexer Stats:");
            JeroMQDemultiplexRouter.this.logger.debug("  Errors {}", this.errorCounter);
            JeroMQDemultiplexRouter.this.logger.debug("  Total Routes: {}", Integer.valueOf(this.routes.size()));
            JeroMQDemultiplexRouter.this.logger.debug("  Routes:");
            this.routes.forEach((nodeId, str) -> {
                JeroMQDemultiplexRouter.this.logger.debug("  Node {} (master:{}) -> {} @{})", new Object[]{nodeId, Boolean.valueOf(nodeId.isMaster()), str, JeroMQDemultiplexRouter.this.backends.get(nodeId)});
            });
        }

        public void log() {
            this.log.run();
        }

        public void error() {
            this.errorCounter.increment();
        }

        public void addRoute(NodeId nodeId, String str) {
            this.routes.put(nodeId, str);
        }

        public void removeRoute(NodeId nodeId) {
            this.routes.remove(nodeId);
        }
    }

    public JeroMQDemultiplexRouter(InstanceId instanceId, ZContext zContext, ZMQ.Poller poller, ZMQ.PollItem pollItem) {
        this.instanceId = instanceId;
        this.logger = JeroMQRoutingServer.getLogger(getClass(), instanceId);
        this.zContext = zContext;
        this.poller = poller;
        this.frontend = pollItem;
    }

    public void poll() {
        this.backends.forEach(this::routeToFrontend);
    }

    public String openBinding(NodeId nodeId) {
        if (!Objects.equals(this.instanceId, nodeId.getInstanceId())) {
            this.logger.error("Attempting to open node binding for remote node {}", nodeId);
            throw new JeroMQControlException(JeroMQControlResponseCode.INTERNAL_ERROR);
        }
        this.logger.info("Opening binding for node {}", nodeId);
        if (this.backends.containsKey(nodeId)) {
            throw new JeroMQControlException(JeroMQControlResponseCode.DUPLICATE_NODE_BINDING);
        }
        ZMQ.Socket createSocket = this.zContext.createSocket(SocketType.DEALER);
        ZMQ.PollItem item = this.poller.getItem(this.poller.register(createSocket, 5));
        String localBindAddress = getLocalBindAddress(nodeId);
        createSocket.connect(localBindAddress);
        this.stats.addRoute(nodeId, localBindAddress);
        if (this.backends.putIfAbsent(nodeId, item) == null) {
            this.logger.info("Opening binding for node {} with connect address {}", nodeId, localBindAddress);
            return localBindAddress;
        }
        this.logger.error("Server has duplicate node binding: {}", nodeId);
        this.poller.unregister(createSocket);
        createSocket.close();
        throw new JeroMQControlException(JeroMQControlResponseCode.INTERNAL_ERROR);
    }

    public void closeBindingForNode(NodeId nodeId) {
        try {
            if (!Objects.equals(this.instanceId, nodeId.getInstanceId())) {
                this.logger.error("Attempting to close node binding for remote node.");
                throw new JeroMQControlException(JeroMQControlResponseCode.INTERNAL_ERROR);
            }
            ZMQ.PollItem remove = this.backends.remove(nodeId);
            if (remove == null) {
                this.logger.warn("No such binding {}", nodeId);
                throw new JeroMQControlException(JeroMQControlResponseCode.NO_SUCH_NODE_BINDING);
            }
            this.logger.info("Removed binding for node {}.", nodeId);
            ZMQ.Socket socket = remove.getSocket();
            this.poller.unregister(socket);
            socket.close();
            this.logger.info("Closed binding for node {}.", nodeId);
            this.stats.removeRoute(nodeId);
        } catch (Throwable th) {
            this.stats.removeRoute(nodeId);
            throw th;
        }
    }

    public void forward(ZMsg zMsg, ZMsg zMsg2) {
        ZMQ.Socket socket = getSocket(NodeId.nodeIdFromBytes(zMsg.removeFirst().getData()));
        IdentityUtil.pushIdentity(zMsg, zMsg2);
        if (zMsg.send(socket)) {
            return;
        }
        this.logger.error("Failed to send: {}", Integer.valueOf(socket.errno()));
    }

    private ZMQ.Socket getSocket(NodeId nodeId) {
        ZMQ.PollItem pollItem = this.backends.get(nodeId);
        if (pollItem == null) {
            throw new JeroMQUnroutableNodeException(nodeId);
        }
        return pollItem.getSocket();
    }

    private void routeToFrontend(NodeId nodeId, ZMQ.PollItem pollItem) {
        if (pollItem.isReadable()) {
            ZMQ.Socket socket = pollItem.getSocket();
            ZMQ.Socket socket2 = this.frontend.getSocket();
            ZMsg recvMsg = ZMsg.recvMsg(socket);
            ZMsg popIdentity = IdentityUtil.popIdentity(recvMsg);
            recvMsg.addFirst(new ZFrame(nodeId.asBytes()));
            JeroMQControlResponseCode.OK.pushResponseCode(recvMsg);
            IdentityUtil.pushIdentity(recvMsg, popIdentity);
            if (recvMsg.send(socket2)) {
                return;
            }
            this.logger.error("Failed to send: {}", Integer.valueOf(socket2.errno()));
        }
    }

    public Collection<NodeId> getConnectedNodeIds() {
        return Collections.unmodifiableCollection(this.backends.keySet());
    }

    public String getLocalBindAddress(NodeId nodeId) {
        return String.format("inproc://demux/%s/%s?uid=%016X", this.instanceId, nodeId.asString(), Long.valueOf(this.socketUniqueIds.incrementAndGet()));
    }

    public void log() {
        this.stats.log();
    }
}
