package dev.getelements.elements.rt.remote.jeromq;

import com.google.common.collect.Multimaps;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import dev.getelements.elements.sdk.cluster.id.InstanceId;
import dev.getelements.elements.sdk.cluster.id.NodeId;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQMultiplexRouter.class */
public class JeroMQMultiplexRouter {
    private final InstanceId instanceId;
    private final Logger logger;
    private final ZContext zContext;
    private final ZMQ.Poller poller;
    private final Map<NodeId, ZMQ.PollItem> frontends = new LinkedHashMap();
    private final Map<InstanceId, ZMQ.PollItem> backends = new LinkedHashMap();
    private final Collection<ZMQ.PollItem> backendItems = this.backends.values();
    private final SortedSetMultimap<NodeId, JeroMQInstanceConnectionId> routingTable = TreeMultimap.create();
    private final Stats stats = new Stats();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQMultiplexRouter$Stats.class */
    public class Stats {
        private final SortedMap<NodeId, InstanceId> fRoutes = new TreeMap();
        private final SortedMap<InstanceId, JeroMQInstanceConnectionId> bRoutes = new TreeMap();
        private final JeroMQDebugCounter errorCounter = new JeroMQDebugCounter();
        private final Runnable log;

        private Stats() {
            this.log = JeroMQMultiplexRouter.this.logger.isDebugEnabled() ? this::doLog : () -> {
            };
        }

        private void doLog() {
            StringBuilder sb = new StringBuilder();
            sb.append("\nMultiplexer Stats for ").append(JeroMQMultiplexRouter.this.instanceId).append(":");
            sb.append("\n  Errors: ").append(this.errorCounter);
            sb.append("\n  Total Routes: ").append(JeroMQMultiplexRouter.this.routingTable.size());
            sb.append("\n  Backend Logical Routes: ").append(this.bRoutes.size());
            sb.append("\n  Frontend Logical Routes: ").append(this.fRoutes.size());
            sb.append("\n  Frontend Routes:");
            this.fRoutes.forEach((nodeId, instanceId) -> {
                sb.append("\n    ").append(JeroMQMultiplexRouter.this.instanceId.equals(nodeId.getInstanceId()) ? "L" : "R").append(nodeId.isMaster() ? "M: " : "W: ").append(nodeId).append(" -> ").append(instanceId);
            });
            sb.append("\n  Backend Routes:");
            this.bRoutes.forEach((instanceId2, jeroMQInstanceConnectionId) -> {
                sb.append("\n    ").append(JeroMQMultiplexRouter.this.instanceId.equals(instanceId2) ? "L: " : "R: ").append(instanceId2).append(" -> ").append(jeroMQInstanceConnectionId);
            });
            sb.append("\n  Full Routing Table:");
            JeroMQMultiplexRouter.this.routingTable.forEach((nodeId2, jeroMQInstanceConnectionId2) -> {
                sb.append("\n    ").append(JeroMQMultiplexRouter.this.instanceId.equals(nodeId2.getInstanceId()) ? "L" : "R").append(nodeId2.isMaster() ? "M: " : "W: ").append(nodeId2.getInstanceId()).append(" -> ").append(nodeId2).append(" -> ").append(jeroMQInstanceConnectionId2);
            });
            JeroMQMultiplexRouter.this.logger.debug("{}", sb);
        }

        public void log() {
            this.log.run();
        }

        public void error() {
            this.errorCounter.increment();
        }

        public void addRoute(NodeId nodeId) {
            this.fRoutes.put(nodeId, nodeId.getInstanceId());
        }

        public void addRoute(InstanceId instanceId, JeroMQInstanceConnectionId jeroMQInstanceConnectionId) {
            this.bRoutes.put(instanceId, jeroMQInstanceConnectionId);
        }

        public void removeRoute(NodeId nodeId) {
            this.fRoutes.remove(nodeId);
        }

        public void removeRoute(InstanceId instanceId) {
            this.bRoutes.remove(instanceId);
        }
    }

    public JeroMQMultiplexRouter(InstanceId instanceId, ZContext zContext, ZMQ.Poller poller) {
        this.instanceId = instanceId;
        this.logger = JeroMQRoutingServer.getLogger(getClass(), instanceId);
        this.poller = poller;
        this.zContext = zContext;
    }

    public void poll() {
        this.backendItems.forEach(this::routeToFrontend);
        this.frontends.forEach(this::routeToBackend);
    }

    private void routeToFrontend(ZMQ.PollItem pollItem) {
        if (pollItem.isReadable()) {
            try {
                ZMsg recvMsg = ZMsg.recvMsg(pollItem.getSocket());
                ZMsg popIdentity = IdentityUtil.popIdentity(recvMsg);
                JeroMQControlResponseCode stripCode = JeroMQControlResponseCode.stripCode(recvMsg);
                switch (stripCode) {
                    case OK:
                        respondWithSuccess(recvMsg, popIdentity);
                        break;
                    default:
                        respondWithFailure(recvMsg, popIdentity, stripCode);
                        break;
                }
            } catch (Exception e) {
                this.logger.error("Caught exception routing incoming message.", e);
            }
        }
    }

    private void respondWithSuccess(ZMsg zMsg, ZMsg zMsg2) {
        ZMQ.Socket frontend = getFrontend(NodeId.nodeIdFromBytes(zMsg.removeFirst().getData()));
        JeroMQControlResponseCode.OK.pushResponseCode(zMsg);
        IdentityUtil.pushIdentity(zMsg, zMsg2);
        if (zMsg.send(frontend)) {
            return;
        }
        this.logger.error("Failed to send: {}", Integer.valueOf(frontend.errno()));
    }

    private void respondWithFailure(ZMsg zMsg, ZMsg zMsg2, JeroMQControlResponseCode jeroMQControlResponseCode) {
        try {
            ZFrame removeFirst = zMsg.removeFirst();
            ZFrame removeFirst2 = zMsg.removeFirst();
            ZFrame removeFirst3 = zMsg.removeFirst();
            ZMQ.Socket frontend = getFrontend(NodeId.nodeIdFromBytes(removeFirst3.getData()));
            zMsg.addFirst(removeFirst3);
            zMsg.addFirst(removeFirst2);
            zMsg.addFirst(removeFirst);
            jeroMQControlResponseCode.pushResponseCode(zMsg);
            IdentityUtil.pushIdentity(zMsg, zMsg2);
            if (!zMsg.send(frontend)) {
                this.logger.error("Failed to send: {}", Integer.valueOf(frontend.errno()));
            }
        } finally {
            this.stats.error();
        }
    }

    private ZMQ.Socket getFrontend(NodeId nodeId) {
        ZMQ.PollItem pollItem = this.frontends.get(nodeId);
        if (pollItem == null) {
            throw new JeroMQUnroutableNodeException(nodeId);
        }
        return pollItem.getSocket();
    }

    private void routeToBackend(NodeId nodeId, ZMQ.PollItem pollItem) {
        if (pollItem.isReadable()) {
            ZMQ.Socket socket = pollItem.getSocket();
            try {
                ZMQ.Socket socket2 = getBackend(nodeId.getInstanceId()).getSocket();
                ZMsg recvMsg = ZMsg.recvMsg(socket);
                ZMsg popIdentity = IdentityUtil.popIdentity(recvMsg);
                recvMsg.addFirst(new ZFrame(nodeId.asBytes()));
                JeroMQRoutingCommand.FORWARD.pushCommand(recvMsg);
                IdentityUtil.pushIdentity(recvMsg, popIdentity);
                if (!recvMsg.send(socket2)) {
                    this.logger.error("Failed to send: {}", Integer.valueOf(socket2.errno()));
                }
            } catch (JeroMQControlException e) {
                this.logger.error("No such instance for node {}", nodeId, e);
                if (JeroMQRoutingServer.exceptionError(this.logger, e.getCode(), e).send(socket)) {
                    return;
                }
                this.logger.error("Failed to send: {}", Integer.valueOf(socket.errno()));
            } catch (Exception e2) {
                this.logger.error("Caught exception routing outgoing message to {}", nodeId, e2);
                if (JeroMQRoutingServer.exceptionError(this.logger, e2).send(socket)) {
                    return;
                }
                this.logger.error("Failed to send: {}", Integer.valueOf(socket.errno()));
            }
        }
    }

    private ZMQ.PollItem getBackend(InstanceId instanceId) {
        ZMQ.PollItem pollItem = this.backends.get(instanceId);
        if (pollItem == null) {
            throw new JeroMQUnroutableInstanceException(instanceId);
        }
        return pollItem;
    }

    public String openRouteToNode(NodeId nodeId, String str) {
        JeroMQInstanceConnectionId jeroMQInstanceConnectionId = new JeroMQInstanceConnectionId(str);
        this.logger.info("Opening route to node {} via {}", nodeId, jeroMQInstanceConnectionId);
        ZMQ.PollItem computeIfAbsent = this.frontends.computeIfAbsent(nodeId, nodeId2 -> {
            return this.poller.getItem(this.poller.register(this.zContext.createSocket(SocketType.ROUTER), 5));
        });
        String localBindAddress = getLocalBindAddress(nodeId, jeroMQInstanceConnectionId);
        if (this.routingTable.put(nodeId, jeroMQInstanceConnectionId)) {
            this.backends.computeIfAbsent(nodeId.getInstanceId(), instanceId -> {
                ZMQ.Socket createSocket = this.zContext.createSocket(SocketType.DEALER);
                this.stats.addRoute(instanceId, jeroMQInstanceConnectionId);
                return this.poller.getItem(this.poller.register(createSocket, 5));
            }).getSocket().connect(str);
            computeIfAbsent.getSocket().bind(localBindAddress);
            this.stats.addRoute(nodeId);
        }
        return localBindAddress;
    }

    public void closeRoutesViaInstance(InstanceId instanceId, String str) {
        JeroMQInstanceConnectionId jeroMQInstanceConnectionId = new JeroMQInstanceConnectionId(str);
        List list = (List) this.frontends.keySet().stream().filter(nodeId -> {
            return nodeId.getInstanceId().equals(instanceId);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            this.logger.warn("No nodes found for instance {}. Nothing to close.", instanceId);
        } else if (((Boolean) list.stream().map(nodeId2 -> {
            return Boolean.valueOf(doCloseFrontend(nodeId2, jeroMQInstanceConnectionId));
        }).reduce(true, (bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        })).booleanValue()) {
            doCloseBackend(instanceId);
        }
    }

    private boolean doCloseFrontend(NodeId nodeId, JeroMQInstanceConnectionId jeroMQInstanceConnectionId) {
        this.frontends.compute(nodeId, (nodeId2, pollItem) -> {
            if (pollItem == null) {
                this.logger.warn("No route to node {} via {}", nodeId, jeroMQInstanceConnectionId);
                return null;
            }
            if (!this.routingTable.remove(nodeId, jeroMQInstanceConnectionId)) {
                this.logger.warn("No route to node {} via {}", nodeId, jeroMQInstanceConnectionId);
                return null;
            }
            if (!this.routingTable.containsKey(nodeId)) {
                this.logger.debug("Removing {}.", nodeId);
                doClose(pollItem, String.format("Node %s", nodeId));
                this.stats.removeRoute(nodeId);
                return null;
            }
            ZMQ.Socket socket = pollItem.getSocket();
            String localBindAddress = getLocalBindAddress(nodeId, jeroMQInstanceConnectionId);
            socket.unbind(localBindAddress);
            this.logger.debug("Other routes exist for {}. Deferring removal. Unbinding {}", nodeId, localBindAddress);
            return null;
        });
        return !this.routingTable.containsKey(nodeId);
    }

    private void doCloseBackend(InstanceId instanceId) {
        try {
            doClose(this.backends.remove(instanceId), String.format("Instance %s", instanceId));
        } finally {
            this.stats.removeRoute(instanceId);
        }
    }

    private void doClose(ZMQ.PollItem pollItem, Object obj) {
        if (pollItem == null) {
            this.logger.error("No socket for index {} {}", pollItem, obj);
            return;
        }
        ZMQ.Socket socket = pollItem.getSocket();
        this.logger.info("Closing socket for sockets[{}] for {}", pollItem, obj);
        this.poller.unregister(socket);
        try {
            socket.close();
        } catch (Exception e) {
            this.logger.error("Error closing socket for {}", obj, e);
        }
    }

    public String getLocalBindAddress(NodeId nodeId, JeroMQInstanceConnectionId jeroMQInstanceConnectionId) {
        return String.format("inproc://%s/mux/%s?%s", this.instanceId, nodeId.asString(), jeroMQInstanceConnectionId);
    }

    public void log() {
        this.stats.log();
    }

    public SortedSetMultimap<NodeId, JeroMQInstanceConnectionId> getRoutingTable() {
        return Multimaps.unmodifiableSortedSetMultimap(this.routingTable);
    }
}
