package cloud.orbit.actors.cluster.impl;

import cloud.orbit.actors.cluster.RedisClusterConfig;
import cloud.orbit.exception.UncheckedException;
import cloud.orbit.redis.shaded.jodd.io.NetUtil;
import cloud.orbit.redis.shaded.jodd.util.StringPool;
import cloud.orbit.redis.shaded.netty.channel.EventLoopGroup;
import cloud.orbit.redis.shaded.netty.channel.nio.NioEventLoopGroup;
import cloud.orbit.redis.shaded.redisson.Redisson;
import cloud.orbit.redis.shaded.redisson.api.listener.MessageListener;
import cloud.orbit.redis.shaded.redisson.codec.JsonJacksonCodec;
import cloud.orbit.redis.shaded.redisson.codec.SerializationCodec;
import cloud.orbit.redis.shaded.redisson.config.Config;
import com.github.ssedano.hash.JumpConsistentHash;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/orbit/actors/cluster/impl/RedisConnectionManager.class */
public class RedisConnectionManager {
    private RedisClusterConfig redisClusterConfig;
    private List<RedisOrbitClient> nodeDirectoryClients = new ArrayList();
    private List<RedisOrbitClient> actorDirectoryClients = new ArrayList();
    private List<RedisOrbitClient> messagingClients = new ArrayList();
    private EventLoopGroup eventLoopGroup;
    private static Logger logger = LoggerFactory.getLogger((Class<?>) RedisConnectionManager.class);

    public RedisConnectionManager(RedisClusterConfig redisClusterConfig) {
        this.redisClusterConfig = null;
        this.eventLoopGroup = null;
        this.redisClusterConfig = redisClusterConfig;
        if (redisClusterConfig.getShareEventLoop().booleanValue()) {
            this.eventLoopGroup = new NioEventLoopGroup();
        }
        for (String str : redisClusterConfig.getNodeDirectoryUris()) {
            logger.info("Connecting to Redis Node Directory node at '{}'...", str);
            this.nodeDirectoryClients.add(createClient(str, true));
        }
        for (String str2 : redisClusterConfig.getActorDirectoryUris()) {
            logger.info("Connecting to Redis Actor Directory node at '{}'...", str2);
            this.actorDirectoryClients.add(createClient(str2, true));
        }
        for (String str3 : redisClusterConfig.getMessagingUris()) {
            logger.info("Connecting to Redis messaging node at '{}'...", str3);
            this.messagingClients.add(createClient(str3, false));
        }
    }

    public List<RedisOrbitClient> getNodeDirectoryClients() {
        return Collections.unmodifiableList(this.nodeDirectoryClients);
    }

    public List<RedisOrbitClient> getActorDirectoryClients() {
        return Collections.unmodifiableList(this.actorDirectoryClients);
    }

    public List<RedisOrbitClient> getMessagingClients() {
        return Collections.unmodifiableList(this.messagingClients);
    }

    public RedisOrbitClient getShardedNodeDirectoryClient(String str) {
        return this.nodeDirectoryClients.get(JumpConsistentHash.jumpConsistentHash(str, this.nodeDirectoryClients.size()));
    }

    public RedisOrbitClient getShardedActorDirectoryClient(String str) {
        return this.actorDirectoryClients.get(JumpConsistentHash.jumpConsistentHash(str, this.actorDirectoryClients.size()));
    }

    public void subscribeToChannel(String str, MessageListener<RedisMsg> messageListener) {
        Iterator<RedisOrbitClient> it = this.messagingClients.iterator();
        while (it.hasNext()) {
            it.next().subscribe(str, messageListener);
        }
    }

    public void sendMessageToChannel(String str, Object obj) {
        sendMessageToChannel(str, obj, (List) this.messagingClients.stream().filter(redisOrbitClient -> {
            return redisOrbitClient.isConnectied();
        }).collect(Collectors.toCollection(ArrayList::new)), 1);
    }

    private void sendMessageToChannel(String str, Object obj, List<RedisOrbitClient> list, int i) {
        int size = list.size();
        if (size == 0) {
            logger.error("Failed to send message to channel '{}', no redis messaging instances were available after {} attempts.", str, Integer.valueOf(i));
        } else {
            list.remove(ThreadLocalRandom.current().nextInt(size)).getRedissonClient().getTopic(str).publishAsync(obj).whenComplete((l, th) -> {
                if (th != null) {
                    logger.error("Failed to send message to channel '{}'", str, th);
                    return;
                }
                if (l.longValue() == 0) {
                    if (i >= this.redisClusterConfig.getMessageSendAttempts().intValue()) {
                        logger.error("Failed to send message to channel '{}' after {} attempts.", str, Integer.valueOf(i));
                    } else {
                        logger.warn("Failed to send message to channel '{}' on attempt {}. Retrying...", str, Integer.valueOf(i));
                        sendMessageToChannel(str, obj, list, i + 1);
                    }
                }
            });
        }
    }

    public void shutdownConnections() {
        this.nodeDirectoryClients.forEach((v0) -> {
            v0.shutdown();
        });
        this.actorDirectoryClients.forEach((v0) -> {
            v0.shutdown();
        });
        this.messagingClients.forEach((v0) -> {
            v0.shutdown();
        });
    }

    private RedisOrbitClient createClient(String str, Boolean bool) {
        URI create = URI.create(str);
        if (!create.getScheme().equalsIgnoreCase("redis")) {
            throw new UncheckedException("Invalid Redis URI.");
        }
        String host = create.getHost();
        if (host == null) {
            host = NetUtil.LOCAL_HOST;
        }
        Integer valueOf = Integer.valueOf(create.getPort());
        if (valueOf.intValue() == -1) {
            valueOf = 6379;
        }
        String str2 = "redis://" + host + StringPool.COLON + valueOf;
        Config config = new Config();
        config.setCodec(new RedisPipelineCodec(this.redisClusterConfig.getPipelineSteps(), bool.booleanValue() ? new SerializationCodec() : new JsonJacksonCodec()));
        if (this.eventLoopGroup != null) {
            config.setEventLoopGroup(this.eventLoopGroup);
        }
        config.setThreads(this.redisClusterConfig.getRedissonThreads().intValue());
        config.setNettyThreads(this.redisClusterConfig.getNettyThreads().intValue());
        if (this.redisClusterConfig.getRedissonExecutorService() != null) {
            config.setExecutor(this.redisClusterConfig.getRedissonExecutorService());
        }
        config.useSingleServer().setDnsMonitoringInterval(this.redisClusterConfig.getDnsMonitoringInverval().intValue()).setAddress(str2).setConnectionMinimumIdleSize(this.redisClusterConfig.getMinRedisConnections().intValue()).setConnectionPoolSize(this.redisClusterConfig.getMaxRedisConnections().intValue()).setConnectTimeout(this.redisClusterConfig.getConnectionTimeout().intValue()).setTimeout(this.redisClusterConfig.getGeneralTimeout().intValue()).setIdleConnectionTimeout(this.redisClusterConfig.getIdleTimeout().intValue()).setReconnectionTimeout(this.redisClusterConfig.getReconnectionTimeout().intValue()).setPingTimeout(this.redisClusterConfig.getPingTimeout().intValue()).setFailedAttempts(this.redisClusterConfig.getFailedAttempts().intValue()).setRetryAttempts(this.redisClusterConfig.getRetryAttempts().intValue()).setRetryInterval(this.redisClusterConfig.getRetryInterval().intValue());
        return new RedisOrbitClient(Redisson.create(config), this.redisClusterConfig.getMessagingHealthcheckInterval());
    }
}
