package org.smartboot.mqtt.data.persistence.impl;

import cn.hutool.core.text.StrPool;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
import org.smartboot.mqtt.broker.plugin.PluginException;
import org.smartboot.mqtt.data.persistence.DataPersistPlugin;
import org.smartboot.mqtt.data.persistence.config.imp.RedisPluginConfig;
import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo;
import org.smartboot.mqtt.data.persistence.utils.StrUtils;

/* loaded from: input_file:org/smartboot/mqtt/data/persistence/impl/RedisPlugin.class */
public class RedisPlugin extends DataPersistPlugin<RedisPluginConfig> {
    private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]";
    private static final String MESSAGE_PREFIX = "smart-mqtt-message:";
    private static StatefulRedisConnection<String, String> CONNECTION;
    private static RedisClient CLIENT;
    private static RedisAsyncCommands<String, String> ASYNC_COMMAND;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RedisPlugin.class);
    private static StrUtils<MessageNodeInfo> StrUtil = new StrUtils<>();
    private static AtomicInteger atomicInteger = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.smartboot.mqtt.data.persistence.DataPersistPlugin
    public RedisPluginConfig connect(BrokerContext brokerContext) {
        RedisPluginConfig redisPluginConfig = (RedisPluginConfig) brokerContext.parseConfig(CONFIG_JSON_PATH, RedisPluginConfig.class);
        if (redisPluginConfig == null) {
            LOGGER.error("config maybe error, parse fail!");
            throw new PluginException("start DataPersistRedisPlugin exception");
        }
        setConfig(redisPluginConfig);
        LOGGER.info("redisPoll create success");
        CLIENT = RedisClient.create(RedisURI.builder().withHost(redisPluginConfig.getHost().split(StrPool.COLON)[0]).withPort(Integer.parseInt(redisPluginConfig.getHost().split(StrPool.COLON)[1])).withPassword(redisPluginConfig.getPassword()).withTimeout(Duration.of(redisPluginConfig.getTimeout(), ChronoUnit.SECONDS)).build());
        CONNECTION = CLIENT.connect();
        ASYNC_COMMAND = CONNECTION.async();
        return redisPluginConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.smartboot.mqtt.data.persistence.DataPersistPlugin
    public void listenAndPushMessage(BrokerContext brokerContext, RedisPluginConfig redisPluginConfig) {
        MessageBus messageBus = brokerContext.getMessageBus();
        System.currentTimeMillis();
        messageBus.consumer((mqttSession, message) -> {
            MessageNodeInfo messageNodeInfo = new MessageNodeInfo(message);
            String str = "smart-mqtt-message::" + message.getTopic();
            String messageNodeInfo2 = messageNodeInfo.toString();
            if (redisPluginConfig.isBase64()) {
                messageNodeInfo2 = StrUtil.base64(messageNodeInfo);
            }
            if (!redisPluginConfig.isSimple()) {
                StrUtils<MessageNodeInfo> strUtils = StrUtil;
                messageNodeInfo2 = StrUtils.addId(messageNodeInfo2);
            }
            ASYNC_COMMAND.zadd((RedisAsyncCommands<String, String>) str, messageNodeInfo.getCreateTime(), (double) messageNodeInfo2).thenAccept(l -> {
            });
        });
    }

    @Override // org.smartboot.mqtt.broker.plugin.Plugin
    protected void destroyPlugin() {
        CONNECTION.close();
        CLIENT.shutdown();
    }
}
