package dev.getelements.elements.rt.remote.jeromq;

import dev.getelements.elements.rt.AsyncConnection;
import dev.getelements.elements.rt.AsyncConnectionPool;
import dev.getelements.elements.rt.AsyncConnectionService;
import dev.getelements.elements.rt.Connection;
import dev.getelements.elements.rt.exception.BaseException;
import dev.getelements.elements.rt.exception.InternalException;
import dev.getelements.elements.rt.remote.AsyncControlClient;
import dev.getelements.elements.rt.remote.InstanceConnectionService;
import dev.getelements.elements.rt.remote.InstanceStatus;
import dev.getelements.elements.sdk.cluster.id.InstanceId;
import dev.getelements.elements.sdk.cluster.id.NodeId;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQAsyncControlClient.class */
public class JeroMQAsyncControlClient implements AsyncControlClient {
    private static final int DEFAULT_MIN_CONNECTIONS = 1;
    private static final int DEFAULT_MAX_CONNECTIONS = 100;
    private final String instanceConnectAddress;
    private final JeroMQSecurity jeroMQSecurity;
    private final AsyncConnectionPool<ZContext, ZMQ.Socket> pool;
    private static final String POOL_NAME = JeroMQControlClient.class.getSimpleName();
    private static final Logger logger = LoggerFactory.getLogger(JeroMQAsyncControlClient.class);

    public JeroMQAsyncControlClient(AsyncConnectionService<ZContext, ZMQ.Socket> asyncConnectionService, String str, JeroMQSecurity jeroMQSecurity) {
        this(asyncConnectionService, str, jeroMQSecurity, DEFAULT_MIN_CONNECTIONS, DEFAULT_MAX_CONNECTIONS);
    }

    public JeroMQAsyncControlClient(AsyncConnectionService<ZContext, ZMQ.Socket> asyncConnectionService, String str, JeroMQSecurity jeroMQSecurity, int i, int i2) {
        this.instanceConnectAddress = str;
        this.jeroMQSecurity = jeroMQSecurity;
        this.pool = asyncConnectionService.allocatePool(POOL_NAME, i, i2, zContext -> {
            ZMQ.Socket open = JeroMQControlClient.open(jeroMQSecurity, zContext);
            open.connect(str);
            return open;
        });
    }

    public AsyncControlClient.Request getInstanceStatus(AsyncControlClient.ResponseConsumer<InstanceStatus> responseConsumer) {
        logger.debug("Getting instance status.");
        return dispatch(() -> {
            ZMsg zMsg = new ZMsg();
            JeroMQRoutingCommand.GET_INSTANCE_STATUS.pushCommand(zMsg);
            return zMsg;
        }, response -> {
            responseConsumer.accept(response.map(JeroMQInstanceStatus::new));
        });
    }

    public AsyncControlClient.Request openRouteToNode(NodeId nodeId, String str, AsyncControlClient.ResponseConsumer<String> responseConsumer) {
        logger.debug("Opening route to node {} at {}", nodeId, str);
        return dispatch(() -> {
            ZMsg zMsg = new ZMsg();
            JeroMQRoutingCommand.OPEN_ROUTE_TO_NODE.pushCommand(zMsg);
            zMsg.add(nodeId.asBytes());
            zMsg.add(str.getBytes(JeroMQRoutingServer.CHARSET));
            return zMsg;
        }, response -> {
            responseConsumer.accept(response.map(zMsg -> {
                return zMsg.getFirst().getString(JeroMQRoutingServer.CHARSET);
            }));
        });
    }

    public AsyncControlClient.Request closeRoutesViaInstance(InstanceId instanceId, String str, AsyncControlClient.ResponseConsumer<Void> responseConsumer) {
        logger.debug("Closing all routes for instance {}", instanceId);
        return dispatch(() -> {
            ZMsg zMsg = new ZMsg();
            JeroMQRoutingCommand.CLOSE_ROUTES_VIA_INSTANCE.pushCommand(zMsg);
            zMsg.add(instanceId.asBytes());
            zMsg.add(str.getBytes(JeroMQRoutingServer.CHARSET));
            return zMsg;
        }, response -> {
            responseConsumer.accept(response.map(zMsg -> {
                return (Void) null;
            }));
        });
    }

    public AsyncControlClient.Request openBinding(NodeId nodeId, AsyncControlClient.ResponseConsumer<InstanceConnectionService.InstanceBinding> responseConsumer) {
        logger.debug("Opening binding for {}", nodeId);
        return dispatch(() -> {
            ZMsg zMsg = new ZMsg();
            JeroMQRoutingCommand.OPEN_BINDING_FOR_NODE.pushCommand(zMsg);
            zMsg.add(nodeId.asBytes());
            return zMsg;
        }, (connection, response) -> {
            responseConsumer.accept(response.map(zMsg -> {
                return new JeroMQInstanceBinding((ZContext) connection.context(), nodeId, this.instanceConnectAddress, this.jeroMQSecurity, zMsg.remove().getString(JeroMQRoutingServer.CHARSET));
            }));
        });
    }

    public AsyncControlClient.Request closeBinding(NodeId nodeId, AsyncControlClient.ResponseConsumer<Void> responseConsumer) {
        logger.debug("Closing binding for {}", nodeId);
        return dispatch(() -> {
            ZMsg zMsg = new ZMsg();
            JeroMQRoutingCommand.CLOSE_BINDING_FOR_NODE.pushCommand(zMsg);
            zMsg.add(nodeId.asBytes());
            return zMsg;
        }, response -> {
            responseConsumer.accept(response.map(zMsg -> {
                return null;
            }));
        });
    }

    private <T> AsyncControlClient.Request dispatch(Supplier<ZMsg> supplier, AsyncControlClient.ResponseConsumer<ZMsg> responseConsumer) {
        return dispatch(supplier, (connection, response) -> {
            responseConsumer.accept(response);
        });
    }

    private <T> AsyncControlClient.Request dispatch(Supplier<ZMsg> supplier, BiConsumer<Connection<ZContext, ZMQ.Socket>, AsyncControlClient.Response<ZMsg>> biConsumer) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        StackTraceElement[] stackTrace = logger.isDebugEnabled() ? new Throwable().getStackTrace() : new StackTraceElement[0];
        this.pool.acquireNextAvailableConnection(asyncConnection -> {
            asyncConnection.setEvents(new AsyncConnection.Event[]{AsyncConnection.Event.WRITE, AsyncConnection.Event.ERROR, AsyncConnection.Event.READ});
            asyncConnection.onError(asyncConnection -> {
                try {
                    if (atomicBoolean.compareAndSet(true, false)) {
                        int errno = ((ZMQ.Socket) asyncConnection.socket()).errno();
                        biConsumer.accept(asyncConnection, () -> {
                            throw remap(stackTrace, () -> {
                                return new InternalException("ZMQ Errno: " + errno);
                            });
                        });
                    }
                } finally {
                    asyncConnection.close();
                }
            });
            asyncConnection.onClose(asyncConnection2 -> {
                if (atomicBoolean.compareAndSet(true, false)) {
                    biConsumer.accept(asyncConnection2, () -> {
                        throw remap(stackTrace, () -> {
                            return new InternalException("Connection closed.");
                        });
                    });
                }
            });
            asyncConnection.onRecycle(asyncConnection3 -> {
                if (atomicBoolean.compareAndSet(true, false)) {
                    biConsumer.accept(asyncConnection3, () -> {
                        throw remap(stackTrace, () -> {
                            return new InternalException("Connection recycled.");
                        });
                    });
                }
            });
            asyncConnection.onRead(asyncConnection4 -> {
                try {
                    if (atomicBoolean.compareAndSet(true, false)) {
                        try {
                            ZMsg recv = JeroMQControlClient.recv((ZMQ.Socket) asyncConnection4.socket());
                            biConsumer.accept(asyncConnection4, () -> {
                                return recv;
                            });
                        } catch (BaseException e) {
                            biConsumer.accept(asyncConnection4, () -> {
                                throw remap(stackTrace, e);
                            });
                        } catch (JeroMQControlException e2) {
                            biConsumer.accept(asyncConnection4, () -> {
                                throw remap(stackTrace, e2);
                            });
                        } catch (Exception e3) {
                            biConsumer.accept(asyncConnection4, () -> {
                                throw remap(stackTrace, e3);
                            });
                        }
                    } else {
                        logger.info("Dropping message that was canceled.");
                    }
                } finally {
                    asyncConnection4.recycle();
                }
            });
            asyncConnection.onWrite(asyncConnection5 -> {
                if (!atomicBoolean.get()) {
                    asyncConnection5.recycle();
                    return;
                }
                ZMsg zMsg = (ZMsg) supplier.get();
                JeroMQControlClient.trace(zMsg, stackTrace);
                if (JeroMQControlClient.send(zMsg, (ZMQ.Socket) asyncConnection5.socket())) {
                    asyncConnection5.setEvents(new AsyncConnection.Event[]{AsyncConnection.Event.READ, AsyncConnection.Event.ERROR});
                }
            });
        });
        return () -> {
            atomicBoolean.set(false);
        };
    }

    private static InternalException remap(StackTraceElement[] stackTraceElementArr, Exception exc) {
        return remap(stackTraceElementArr, () -> {
            return new InternalException(exc);
        });
    }

    private static RuntimeException remap(StackTraceElement[] stackTraceElementArr, JeroMQControlException jeroMQControlException) {
        return remap(stackTraceElementArr, () -> {
            try {
                return (RuntimeException) jeroMQControlException.getClass().getConstructor(jeroMQControlException.getClass()).newInstance(jeroMQControlException);
            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                return remap(stackTraceElementArr, () -> {
                    return new InternalException(e);
                });
            }
        });
    }

    private static BaseException remap(StackTraceElement[] stackTraceElementArr, BaseException baseException) {
        return remap(stackTraceElementArr, () -> {
            try {
                return (BaseException) baseException.getClass().getConstructor(Throwable.class).newInstance(baseException);
            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                return remap(stackTraceElementArr, () -> {
                    return new InternalException(e);
                });
            }
        });
    }

    private static <T extends RuntimeException> T remap(StackTraceElement[] stackTraceElementArr, Supplier<T> supplier) {
        T t = supplier.get();
        t.setStackTrace(stackTraceElementArr);
        return t;
    }

    public void close() {
        this.pool.close();
    }
}
