package dev.getelements.elements.rt.remote.jeromq;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import dev.getelements.elements.rt.exception.InternalException;
import dev.getelements.elements.rt.remote.AsyncControlClient;
import dev.getelements.elements.rt.remote.InstanceConnectionService;
import dev.getelements.elements.rt.remote.InstanceDiscoveryService;
import dev.getelements.elements.rt.remote.InstanceHostInfo;
import dev.getelements.elements.rt.remote.InstanceStatus;
import dev.getelements.elements.rt.remote.RemoteInvoker;
import dev.getelements.elements.rt.util.ReadWriteGuard;
import dev.getelements.elements.rt.util.ReentrantReadWriteGuard;
import dev.getelements.elements.sdk.Subscription;
import dev.getelements.elements.sdk.cluster.id.InstanceId;
import dev.getelements.elements.sdk.cluster.id.NodeId;
import dev.getelements.elements.sdk.util.AsyncPublisher;
import dev.getelements.elements.sdk.util.ConcurrentLockedPublisher;
import dev.getelements.elements.sdk.util.Monitor;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;

/* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQInstanceConnectionService.class */
public class JeroMQInstanceConnectionService implements InstanceConnectionService {
    public static final String JEROMQ_CLUSTER_BIND_ADDRESS = "dev.getelements.elements.rt.remote.jeromq.bind.addr";
    public static final String JEROMQ_CONNECTION_SERVICE_REFRESH_INTERVAL_SECONDS = "dev.getelements.elements.rt.remote.jeromq.connection.service.refresh.interval.sec";
    private static final long REPORT_INTERVAL_SECONDS = 15;
    private static final long REFRESH_INTERVAL_SECONDS = 5;
    private static final Logger logger = LoggerFactory.getLogger(JeroMQInstanceConnectionService.class);
    private InstanceId instanceId;
    private ZContext zContext;
    private String bindAddress;
    private long refreshIntervalInSeconds;
    private Provider<RemoteInvoker> remoteInvokerProvider;
    private InstanceDiscoveryService instanceDiscoveryService;
    private AsyncControlClient.Factory asyncControlClientFactory;
    private JeroMQSecurity securityChain;
    private final Lock lock = new ReentrantLock();
    private volatile InstanceConnectionContext context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/getelements/elements/rt/remote/jeromq/JeroMQInstanceConnectionService$InstanceConnectionContext.class */
    public class InstanceConnectionContext {
        private Thread server;
        private Subscription onDiscover;
        private Subscription onUndiscover;
        private AsyncControlClient localControlClient;
        private ScheduledFuture<?> refreshScheduledFuture;
        private final String internalBindAddress;
        private final AsyncPublisher<InstanceConnectionService.InstanceConnection> onConnect;
        private final AsyncPublisher<InstanceConnectionService.InstanceConnection> onDisconnect;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final AtomicBoolean running = new AtomicBoolean(true);
        private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName(JeroMQInstanceConnectionService.class.getSimpleName() + " refresher.");
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                JeroMQInstanceConnectionService.logger.error("Error in scheduler.", th);
            });
            return thread;
        });
        private final Exchanger<Exception> exceptionExchanger = new Exchanger<>();
        private ReadWriteGuard rwGuard = new ReentrantReadWriteGuard();
        private final Map<InstanceHostInfo, AsyncControlClient.Request> pending = new HashMap();
        private final BiMap<InstanceHostInfo, JeroMQInstanceConnection> active = HashBiMap.create();
        private final BiMap<JeroMQInstanceConnection, InstanceHostInfo> rActiveConnections = this.active.inverse();

        private InstanceConnectionContext() {
            this.internalBindAddress = String.format("inproc://control/%s", JeroMQInstanceConnectionService.this.instanceId);
            Lock writeLock = this.rwGuard.getWriteLock();
            ScheduledExecutorService scheduledExecutorService = this.scheduler;
            Objects.requireNonNull(scheduledExecutorService);
            this.onConnect = new ConcurrentLockedPublisher(writeLock, scheduledExecutorService::submit);
            Lock writeLock2 = this.rwGuard.getWriteLock();
            ScheduledExecutorService scheduledExecutorService2 = this.scheduler;
            Objects.requireNonNull(scheduledExecutorService2);
            this.onDisconnect = new ConcurrentLockedPublisher(writeLock2, scheduledExecutorService2::submit);
        }

        public void start() {
            AsyncControlClient open = JeroMQInstanceConnectionService.this.getAsyncControlClientFactory().open(getInternalBindAddress());
            ScheduledExecutorService scheduledExecutorService = this.scheduler;
            Objects.requireNonNull(scheduledExecutorService);
            this.localControlClient = open.withDispatch(scheduledExecutorService::submit);
            this.onDiscover = JeroMQInstanceConnectionService.this.getInstanceDiscoveryService().subscribeToDiscovery(this::createNewConnectionIfAbsent);
            this.onUndiscover = JeroMQInstanceConnectionService.this.getInstanceDiscoveryService().subscribeToUndiscovery(this::disconnect);
            this.server = new Thread(this::server);
            this.server.setDaemon(true);
            this.server.setName(JeroMQInstanceConnectionService.class.getSimpleName() + " server.");
            this.server.setUncaughtExceptionHandler((thread, th) -> {
                JeroMQInstanceConnectionService.logger.error("Error running InstanceConnectionService", th);
            });
            this.server.start();
            this.refreshScheduledFuture = this.scheduler.scheduleAtFixedRate(this::refreshAsync, 0L, JeroMQInstanceConnectionService.REFRESH_INTERVAL_SECONDS, TimeUnit.SECONDS);
            try {
                Exception exchangeException = exchangeException(null);
                if (exchangeException != null) {
                    throw exchangeException;
                }
                this.scheduler.scheduleWithFixedDelay(this::logStatus, 0L, JeroMQInstanceConnectionService.REPORT_INTERVAL_SECONDS, TimeUnit.SECONDS);
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new InternalException(e2);
            }
        }

        private void logStatus() {
            if (JeroMQInstanceConnectionService.logger.isInfoEnabled()) {
                String str = (String) JeroMQInstanceConnectionService.this.getInstanceDiscoveryService().getKnownHosts().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(","));
                String str2 = (String) this.rwGuard.computeRO(condition -> {
                    return (String) this.active.keySet().stream().map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.joining(","));
                });
                JeroMQInstanceConnectionService.logger.debug("\nKnown Hosts [{}]\nPending[{}]\nActive [{}]", new Object[]{str, (String) this.rwGuard.computeRO(condition2 -> {
                    return (String) this.pending.keySet().stream().map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.joining(","));
                }), str2});
            }
        }

        public void createNewConnectionIfAbsent(InstanceHostInfo instanceHostInfo) {
            try {
                if (((Boolean) this.rwGuard.computeRO(condition -> {
                    return Boolean.valueOf((this.pending.containsKey(instanceHostInfo) || this.active.containsKey(instanceHostInfo)) ? false : true);
                })).booleanValue()) {
                    createNewConnection(instanceHostInfo);
                }
            } catch (Exception e) {
                JeroMQInstanceConnectionService.logger.error("Caught exception creating host from address {}", instanceHostInfo.getConnectAddress(), e);
            }
        }

        private void createNewConnection(InstanceHostInfo instanceHostInfo) {
            this.rwGuard.rw(condition -> {
                this.pending.compute(instanceHostInfo, (instanceHostInfo2, request) -> {
                    if (request != null) {
                        return request;
                    }
                    String connectAddress = instanceHostInfo2.getConnectAddress();
                    AsyncControlClient open = JeroMQInstanceConnectionService.this.getAsyncControlClientFactory().open(connectAddress);
                    ScheduledExecutorService scheduledExecutorService = this.scheduler;
                    Objects.requireNonNull(scheduledExecutorService);
                    AsyncControlClient withDispatch = open.withDispatch(scheduledExecutorService::execute);
                    JeroMQInstanceConnectionService.logger.debug("Fetching instance status for {}", instanceHostInfo);
                    return withDispatch.getInstanceStatus(response -> {
                        try {
                            InstanceStatus instanceStatus = (InstanceStatus) response.get();
                            JeroMQInstanceConnectionService.logger.debug("Got status. {} -> {}", instanceHostInfo, instanceStatus);
                            openRouteToMasterNode(instanceHostInfo2, instanceStatus, withDispatch);
                        } catch (Exception e) {
                            JeroMQInstanceConnectionService.logger.debug("Failed to get instance status from {}. Closing.", connectAddress);
                            withDispatch.close();
                            this.rwGuard.rw(condition -> {
                                this.pending.remove(instanceHostInfo2);
                            });
                        }
                    });
                });
            });
        }

        private void openRouteToMasterNode(InstanceHostInfo instanceHostInfo, InstanceStatus instanceStatus, AsyncControlClient asyncControlClient) {
            NodeId forMasterNode = NodeId.forMasterNode(instanceStatus.getInstanceId());
            String connectAddress = instanceHostInfo.getConnectAddress();
            JeroMQInstanceConnectionService.logger.info("Opening route to master node @{}", instanceHostInfo);
            AsyncControlClient.Request openRouteToNode = this.localControlClient.openRouteToNode(forMasterNode, connectAddress, response -> {
                try {
                    try {
                        String str = (String) response.get();
                        JeroMQInstanceConnectionService.logger.info("Obtained master node connect address {}", str);
                        addInstanceConnection(instanceHostInfo, instanceStatus, str);
                        asyncControlClient.close();
                    } catch (Exception e) {
                        JeroMQInstanceConnectionService.logger.warn("Failed to open route to master node {} -> {}", forMasterNode, connectAddress);
                        this.rwGuard.rw(condition -> {
                            this.pending.remove(instanceHostInfo);
                        });
                        asyncControlClient.close();
                    }
                } catch (Throwable th) {
                    asyncControlClient.close();
                    throw th;
                }
            });
            this.rwGuard.rw(condition -> {
                this.pending.put(instanceHostInfo, openRouteToNode);
            });
        }

        private void addInstanceConnection(InstanceHostInfo instanceHostInfo, InstanceStatus instanceStatus, String str) {
            this.onConnect.publishAsync((JeroMQInstanceConnection) this.rwGuard.computeRW(condition -> {
                this.pending.remove(instanceHostInfo);
                return (JeroMQInstanceConnection) this.active.compute(instanceHostInfo, (instanceHostInfo2, jeroMQInstanceConnection) -> {
                    JeroMQInstanceConnectionService.logger.info("Activating connection for {}", instanceHostInfo2);
                    if (jeroMQInstanceConnection != null) {
                        return jeroMQInstanceConnection;
                    }
                    RemoteInvoker remoteInvoker = (RemoteInvoker) JeroMQInstanceConnectionService.this.getRemoteInvokerProvider().get();
                    remoteInvoker.start(str);
                    return new JeroMQInstanceConnection(instanceStatus.getInstanceId(), remoteInvoker, JeroMQInstanceConnectionService.this.getzContext(), getInternalBindAddress(), instanceHostInfo2, JeroMQInstanceConnectionService.this.getSecurityChain(), this::disconnect);
                });
            }), instanceConnection -> {
                this.rwGuard.getCondition().signalAll();
            });
        }

        public void stop() {
            this.onDiscover.unsubscribe();
            this.onUndiscover.unsubscribe();
            this.refreshScheduledFuture.cancel(true);
            drain();
            this.localControlClient.close();
            try {
                this.scheduler.shutdown();
                this.scheduler.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                JeroMQInstanceConnectionService.logger.error("Interrupted while shutting down.", e);
            }
            try {
                this.running.set(false);
                this.server.interrupt();
                this.server.join();
            } catch (InterruptedException e2) {
                throw new InternalException(e2);
            }
        }

        public void refresh(long j, TimeUnit timeUnit) {
            this.rwGuard.rw(condition -> {
                Collection<?> refreshAsync = refreshAsync();
                do {
                    try {
                    } catch (InterruptedException e) {
                        JeroMQInstanceConnectionService.logger.warn("Interrupted refreshing.", e);
                    }
                    if (this.active.keySet().containsAll(refreshAsync)) {
                        JeroMQInstanceConnectionService.logger.debug("Refreshed successfully.");
                        return;
                    }
                } while (condition.await(j, timeUnit));
                String.valueOf(timeUnit);
                InternalException internalException = new InternalException("Timed out after " + j + " " + internalException);
                throw internalException;
            });
        }

        public Collection<InstanceHostInfo> refreshAsync() {
            return (Collection) this.rwGuard.computeRW(condition -> {
                Collection knownHosts = JeroMQInstanceConnectionService.this.getInstanceDiscoveryService().getKnownHosts();
                HashSet hashSet = new HashSet(knownHosts);
                hashSet.removeAll(this.active.keySet());
                hashSet.removeAll(this.pending.keySet());
                hashSet.forEach(this::createNewConnection);
                return knownHosts;
            });
        }

        private void server() {
            try {
                JeroMQRoutingServer jeroMQRoutingServer = new JeroMQRoutingServer(JeroMQInstanceConnectionService.this.getInstanceId(), JeroMQInstanceConnectionService.this.getzContext(), (List) Stream.of((Object[]) new String[]{getInternalBindAddress(), JeroMQInstanceConnectionService.this.getBindAddress()}).filter(str -> {
                    return !str.isBlank();
                }).collect(Collectors.toUnmodifiableList()), JeroMQInstanceConnectionService.this.getSecurityChain());
                try {
                    Exception exchangeException = exchangeException(null);
                    if (!$assertionsDisabled && exchangeException != null) {
                        throw new AssertionError();
                    }
                    AtomicBoolean atomicBoolean = this.running;
                    Objects.requireNonNull(atomicBoolean);
                    jeroMQRoutingServer.run(atomicBoolean::get);
                    JeroMQInstanceConnectionService.logger.info("Got signal to stop running.  Shutting down IO thread.");
                    jeroMQRoutingServer.close();
                } finally {
                }
            } catch (Exception e) {
                JeroMQInstanceConnectionService.logger.error("Exception running the routing server.", e);
                Exception exchangeException2 = exchangeException(e);
                if (!$assertionsDisabled && exchangeException2 != null) {
                    throw new AssertionError();
                }
            }
        }

        private Exception exchangeException(Exception exc) {
            try {
                return this.exceptionExchanger.exchange(exc);
            } catch (InterruptedException e) {
                JeroMQInstanceConnectionService.logger.error("Interrupted exchanging exceptions to calling thread.", e);
                throw new InternalException(e);
            }
        }

        public String getInternalBindAddress() {
            return this.internalBindAddress;
        }

        public List<InstanceConnectionService.InstanceConnection> getActive() {
            return (List) this.rwGuard.computeRO(condition -> {
                return new ArrayList(this.active.values());
            });
        }

        public void drain() {
            this.rwGuard.rw(condition -> {
                this.pending.values().forEach((v0) -> {
                    v0.cancel();
                });
                this.pending.clear();
                Set values = this.active.values();
                AsyncPublisher<InstanceConnectionService.InstanceConnection> asyncPublisher = this.onDisconnect;
                Objects.requireNonNull(asyncPublisher);
                values.forEach((v1) -> {
                    r1.publishAsync(v1);
                });
                this.active.clear();
                condition.signalAll();
            });
        }

        public void disconnect(InstanceHostInfo instanceHostInfo) {
            JeroMQInstanceConnection jeroMQInstanceConnection = (JeroMQInstanceConnection) this.rwGuard.computeRW(condition -> {
                JeroMQInstanceConnection jeroMQInstanceConnection2 = (JeroMQInstanceConnection) this.active.remove(instanceHostInfo);
                if (jeroMQInstanceConnection2 == null) {
                    JeroMQInstanceConnectionService.logger.debug("Connection not active. Removing.");
                } else {
                    this.localControlClient.closeRoutesViaInstance(jeroMQInstanceConnection2.getInstanceId(), instanceHostInfo.getConnectAddress(), response -> {
                        JeroMQInstanceConnectionService.logger.info("Closed routes via {}", JeroMQInstanceConnectionService.this.instanceId);
                    });
                }
                AsyncControlClient.Request remove = this.pending.remove(instanceHostInfo);
                if (remove == null) {
                    JeroMQInstanceConnectionService.logger.debug("No pending request for {}. Skipping.", instanceHostInfo);
                } else {
                    remove.cancel();
                }
                return jeroMQInstanceConnection2;
            });
            if (jeroMQInstanceConnection == null) {
                JeroMQInstanceConnectionService.logger.info("Connection for host {} wasn't removed.", instanceHostInfo.getConnectAddress());
            } else {
                JeroMQInstanceConnectionService.logger.info("Disconnected from instance {}", instanceHostInfo.getConnectAddress());
                this.onDisconnect.publishAsync(jeroMQInstanceConnection, instanceConnection -> {
                    this.rwGuard.getCondition().signalAll();
                    jeroMQInstanceConnection.getRemoteInvoker().stop();
                });
            }
        }

        public void disconnect(JeroMQInstanceConnection jeroMQInstanceConnection) {
            if (((Boolean) this.rwGuard.computeRW(condition -> {
                InstanceHostInfo instanceHostInfo = (InstanceHostInfo) this.rActiveConnections.remove(jeroMQInstanceConnection);
                if (instanceHostInfo == null) {
                    return false;
                }
                this.localControlClient.closeRoutesViaInstance(jeroMQInstanceConnection.getInstanceId(), instanceHostInfo.getConnectAddress(), response -> {
                    JeroMQInstanceConnectionService.logger.info("Closed routes via {}", JeroMQInstanceConnectionService.this.instanceId);
                });
                return true;
            })).booleanValue()) {
                this.onDisconnect.publishAsync(jeroMQInstanceConnection, instanceConnection -> {
                    this.rwGuard.getCondition().signalAll();
                    jeroMQInstanceConnection.getRemoteInvoker().stop();
                });
            }
        }

        public Subscription subscribeToConnect(Consumer<InstanceConnectionService.InstanceConnection> consumer) {
            return this.onConnect.subscribe(consumer);
        }

        public Subscription subscribeToDisconnect(Consumer<InstanceConnectionService.InstanceConnection> consumer) {
            return this.onDisconnect.subscribe(consumer);
        }

        public InstanceConnectionService.InstanceBinding openBinding(NodeId nodeId) {
            JeroMQControlClient jeroMQControlClient = new JeroMQControlClient(JeroMQInstanceConnectionService.this.getzContext(), getInternalBindAddress(), JeroMQInstanceConnectionService.this.getSecurityChain());
            try {
                InstanceConnectionService.InstanceBinding openBinding = jeroMQControlClient.openBinding(nodeId);
                jeroMQControlClient.close();
                return openBinding;
            } catch (Throwable th) {
                try {
                    jeroMQControlClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        static {
            $assertionsDisabled = !JeroMQInstanceConnectionService.class.desiredAssertionStatus();
        }
    }

    public void start() {
        Monitor enter = Monitor.enter(this.lock);
        try {
            if (this.context != null) {
                throw new IllegalStateException("Already started.");
            }
            this.context = new InstanceConnectionContext();
            this.context.start();
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void stop() {
        Monitor enter = Monitor.enter(this.lock);
        try {
            if (this.context == null) {
                throw new IllegalStateException("Not running.");
            }
            InstanceConnectionContext instanceConnectionContext = this.context;
            this.context = null;
            instanceConnectionContext.stop();
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void refresh() {
        getContext().refresh(getRefreshIntervalInSeconds(), TimeUnit.SECONDS);
    }

    public InstanceConnectionService.InstanceBinding openBinding(NodeId nodeId) {
        return getContext().openBinding(nodeId);
    }

    public List<InstanceConnectionService.InstanceConnection> getActiveConnections() {
        return getContext().getActive();
    }

    public Subscription subscribeToConnect(Consumer<InstanceConnectionService.InstanceConnection> consumer) {
        return getContext().subscribeToConnect(consumer);
    }

    public Subscription subscribeToDisconnect(Consumer<InstanceConnectionService.InstanceConnection> consumer) {
        return getContext().subscribeToDisconnect(consumer);
    }

    public String getLocalControlAddress() {
        return getContext().getInternalBindAddress();
    }

    private InstanceConnectionContext getContext() {
        if (this.context == null) {
            throw new IllegalStateException("Not running.");
        }
        return this.context;
    }

    public InstanceId getInstanceId() {
        return this.instanceId;
    }

    @Inject
    public void setInstanceId(InstanceId instanceId) {
        this.instanceId = instanceId;
    }

    public ZContext getzContext() {
        return this.zContext;
    }

    @Inject
    public void setzContext(ZContext zContext) {
        this.zContext = zContext;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    @Inject
    public void setBindAddress(@Named("dev.getelements.elements.rt.remote.jeromq.bind.addr") String str) {
        this.bindAddress = str;
    }

    public Provider<RemoteInvoker> getRemoteInvokerProvider() {
        return this.remoteInvokerProvider;
    }

    @Inject
    public void setRemoteInvokerProvider(Provider<RemoteInvoker> provider) {
        this.remoteInvokerProvider = provider;
    }

    public long getRefreshIntervalInSeconds() {
        return this.refreshIntervalInSeconds;
    }

    @Inject
    public void setRefreshIntervalInSeconds(@Named("dev.getelements.elements.rt.remote.jeromq.connection.service.refresh.interval.sec") long j) {
        this.refreshIntervalInSeconds = j;
    }

    public InstanceDiscoveryService getInstanceDiscoveryService() {
        return this.instanceDiscoveryService;
    }

    @Inject
    public void setInstanceDiscoveryService(InstanceDiscoveryService instanceDiscoveryService) {
        this.instanceDiscoveryService = instanceDiscoveryService;
    }

    public AsyncControlClient.Factory getAsyncControlClientFactory() {
        return this.asyncControlClientFactory;
    }

    @Inject
    public void setAsyncControlClientFactory(AsyncControlClient.Factory factory) {
        this.asyncControlClientFactory = factory;
    }

    public JeroMQSecurity getSecurityChain() {
        return this.securityChain;
    }

    @Inject
    public void setSecurityChain(JeroMQSecurity jeroMQSecurity) {
        this.securityChain = jeroMQSecurity;
    }
}
