package net.oschina.j2cache.redis;

import java.util.Properties;
import net.oschina.j2cache.ClusterPolicy;
import net.oschina.j2cache.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

/* loaded from: input_file:net/oschina/j2cache/redis/RedisPubSubClusterPolicy.class */
public class RedisPubSubClusterPolicy extends BinaryJedisPubSub implements ClusterPolicy {
    private static final Logger log = LoggerFactory.getLogger(RedisPubSubClusterPolicy.class);
    private RedisClient redis;
    private String channel;
    private byte[] channelBytes;

    public RedisPubSubClusterPolicy(String str, RedisClient redisClient) {
        this.redis = redisClient;
        this.channel = str;
        this.channelBytes = str.getBytes();
    }

    @Override // net.oschina.j2cache.ClusterPolicy
    public void connect(Properties properties) {
        long currentTimeMillis = System.currentTimeMillis();
        this.redis.publish(this.channelBytes, Command.join().jsonBytes());
        new Thread(() -> {
            while (true) {
                try {
                    this.redis.subscribe(this, new byte[]{this.channelBytes});
                    log.info("Disconnect to redis channel:" + this.channel);
                    return;
                } catch (JedisConnectionException e) {
                    log.error("Failed connect to redis, reconnect it.", e);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                        return;
                    }
                }
            }
        }, "RedisSubscribeThread").start();
        log.info("Connected to redis channel:" + this.channel + ", time " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
    }

    @Override // net.oschina.j2cache.ClusterPolicy
    public void disconnect() {
        this.redis.publish(this.channelBytes, Command.quit().jsonBytes());
        unsubscribe();
    }

    @Override // net.oschina.j2cache.ClusterPolicy
    public void sendEvictCmd(String str, String... strArr) {
        try {
            this.redis.publish(this.channelBytes, new Command((byte) 2, str, strArr).jsonBytes());
        } catch (Exception e) {
            log.error("Failed to delete cache,region=" + str + ",key=" + String.join(",", strArr), e);
        }
    }

    @Override // net.oschina.j2cache.ClusterPolicy
    public void sendClearCmd(String str) {
        try {
            this.redis.publish(this.channelBytes, new Command((byte) 3, str, "").jsonBytes());
        } catch (Exception e) {
            log.error("Failed to clear cache,region=" + str, e);
        }
    }

    public void onMessage(byte[] bArr, byte[] bArr2) {
        try {
            Command parse = Command.parse(bArr2);
            if (parse == null || parse.isLocal()) {
                return;
            }
            switch (parse.getOperator()) {
                case 1:
                    log.info("Node-" + parse.getSrc() + " joined to " + this.channel);
                    break;
                case 2:
                    evict(parse.getRegion(), parse.getKeys());
                    log.debug("Received cache evict message, region=" + parse.getRegion() + ",key=" + String.join(",", parse.getKeys()));
                    break;
                case Command.OPT_CLEAR_KEY /* 3 */:
                    clear(parse.getRegion());
                    log.debug("Received cache clear message, region=" + parse.getRegion());
                    break;
                case Command.OPT_QUIT /* 4 */:
                    log.info("Node-" + parse.getSrc() + " quit to " + this.channel);
                    break;
                default:
                    log.warn("Unknown message type = " + parse.getOperator());
                    break;
            }
        } catch (Exception e) {
            log.error("Failed to handle received msg", e);
        }
    }
}
