package cn.sliew.carp.framework.queue.kekio.redis;

import cn.sliew.carp.framework.common.util.KeyUtil;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.AttemptsAttribute;
import cn.sliew.carp.framework.queue.kekio.message.MaxAttemptsAttribute;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.carp.framework.queue.kekio.metrics.QueueEvent;
import cn.sliew.milky.common.util.JacksonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.resps.ScanResult;

/* loaded from: input_file:cn/sliew/carp/framework/queue/kekio/redis/RedisQueue.class */
public abstract class RedisQueue<CLIENT extends JedisCommands> extends AbstractRedisQueue<CLIENT> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedisQueue.class);
    private final String queueKey;
    private final String unackedKey;
    private final String messagesKey;
    private final String locksKey;
    private final String attemptsKey;
    private String readMessageWithLockScriptSha;

    public RedisQueue(ObjectMapper objectMapper, String str, QueueExecutor queueExecutor, List<Queue.DeadMessageCallback> list, EventPublisher eventPublisher, MeterRegistry meterRegistry, Boolean bool, Duration duration, Duration duration2, Boolean bool2, TemporalAmount temporalAmount, Integer num) {
        super(objectMapper, str, queueExecutor, list, eventPublisher, meterRegistry, bool, duration, duration2, bool2, temporalAmount, num);
        this.queueKey = KeyUtil.buildKey("kekio-queue.v1", new Object[]{str, "queue"});
        this.unackedKey = KeyUtil.buildKey("kekio-queue.v1", new Object[]{str, "unacked"});
        this.messagesKey = KeyUtil.buildKey("kekio-queue.v1", new Object[]{str, "messages"});
        this.locksKey = KeyUtil.buildKey("kekio-queue.v1", new Object[]{str, "locks"});
        this.attemptsKey = KeyUtil.buildKey("kekio-queue.v1", new Object[]{str, "attempts"});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    public String getQueueKey() {
        return this.queueKey;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    protected String getUnackedKey() {
        return this.unackedKey;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    protected String getMessagesKey() {
        return this.messagesKey;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    protected String getLocksKey() {
        return this.locksKey;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    protected String getAttemptsKey() {
        return this.attemptsKey;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    protected String getReadMessageWithLockScriptSha() {
        return this.readMessageWithLockScriptSha;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue
    public void setReadMessageWithLockScriptSha(String str) {
        this.readMessageWithLockScriptSha = str;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void poll(Queue.QueueCallback queueCallback) {
        Triple<String, Instant, String> readMessageWithLock = readMessageWithLock();
        if (readMessageWithLock != null) {
            String str = (String) readMessageWithLock.getLeft();
            Instant instant = (Instant) readMessageWithLock.getMiddle();
            String str2 = (String) readMessageWithLock.getRight();
            Runnable runnable = () -> {
                ackMessage(str);
            };
            readMessage(str, str2, message -> {
                AttemptsAttribute attemptsAttribute = (AttemptsAttribute) message.getAttribute(AttemptsAttribute.class);
                int attempts = attemptsAttribute != null ? attemptsAttribute.getAttempts() : 0;
                MaxAttemptsAttribute maxAttemptsAttribute = (MaxAttemptsAttribute) message.getAttribute(MaxAttemptsAttribute.class);
                int maxAttempts = maxAttemptsAttribute != null ? maxAttemptsAttribute.getMaxAttempts() : 0;
                if (maxAttempts <= 0 || attempts <= maxAttempts) {
                    fire(new QueueEvent.MessageProcessing(message, instant, Instant.now()));
                    queueCallback.accept(message, runnable);
                } else {
                    log.warn("Message {} with payload {} exceeded {} retries", new Object[]{str, message, Integer.valueOf(maxAttempts)});
                    handleDeadMessage(message);
                    removeMessage(str);
                    fire(QueueEvent.MessageDead);
                }
            });
        }
        fire(QueueEvent.QueuePolled);
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void poll(int i, Queue.QueueCallback queueCallback) {
        poll(queueCallback);
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void push(Message message, TemporalAmount temporalAmount) {
        withJedis(jedisCommands -> {
            String firstFingerprint = firstFingerprint(this.queueKey, fingerprint(message));
            if (firstFingerprint == null) {
                queueMessage(message, temporalAmount);
                fire(new QueueEvent.MessagePushed(message));
            } else {
                log.info("Re-prioritizing message as an identical one is already on the queue: {}, message: {}", firstFingerprint, message);
                jedisCommands.zadd(this.queueKey, score(temporalAmount), firstFingerprint, ZAddParams.zAddParams().xx());
                fire(new QueueEvent.MessageDuplicate(message));
            }
        });
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    @Scheduled(fixedDelayString = "${queue.retry.frequency.ms:10000}")
    public void retry() {
        withJedis(jedisCommands -> {
            List<String> zrangeByScore = jedisCommands.zrangeByScore(this.unackedKey, 0.0d, score());
            if (CollectionUtils.isNotEmpty(zrangeByScore)) {
                jedisCommands.del((String[]) zrangeByScore.stream().map(str -> {
                    return this.locksKey + ":" + str;
                }).toArray(i -> {
                    return new String[i];
                }));
            }
            for (String str2 : zrangeByScore) {
                int hgetInt = hgetInt(this.attemptsKey, str2);
                readMessageWithoutLock(str2, message -> {
                    MaxAttemptsAttribute maxAttemptsAttribute = (MaxAttemptsAttribute) message.getAttribute(MaxAttemptsAttribute.class);
                    if ((maxAttemptsAttribute != null ? maxAttemptsAttribute.getMaxAttempts() : 0) == 0 && hgetInt >= 4) {
                        log.warn("Message {} with payload {} exceeded max retries", str2, message);
                        handleDeadMessage(message);
                        removeMessage(str2);
                        fire(QueueEvent.MessageDead);
                        return;
                    }
                    if (zismember(jedisCommands, this.queueKey, str2)) {
                        multi(transaction -> {
                            transaction.zrem(this.unackedKey, new String[]{str2});
                            transaction.zadd(this.queueKey, score(), str2);
                            transaction.hincrBy(this.attemptsKey, str2, 1L);
                        });
                        log.info("Not retrying message {} because an identical message is already on the queue", str2);
                        fire(new QueueEvent.MessageDuplicate(message));
                    } else {
                        log.warn("Retrying message {} after {} attempts", str2, Integer.valueOf(hgetInt));
                        jedisCommands.hincrBy(this.attemptsKey, str2, 1L);
                        requeueMessage(str2);
                        fire(QueueEvent.MessageRetried);
                    }
                });
            }
            fire(QueueEvent.RetryPolled);
        });
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public MonitorableQueue.QueueState readState() {
        List<Object> multi = multi(transaction -> {
            transaction.zcard(this.queueKey);
            transaction.zcount(this.queueKey, 0.0d, score());
            transaction.zcard(this.unackedKey);
            transaction.hlen(this.messagesKey);
        });
        int intValue = ((Long) multi.get(0)).intValue();
        int intValue2 = ((Long) multi.get(1)).intValue();
        int intValue3 = ((Long) multi.get(2)).intValue();
        return new MonitorableQueue.QueueState(Integer.valueOf(intValue), Integer.valueOf(intValue2), Integer.valueOf(intValue3), Integer.valueOf(((Long) multi.get(3)).intValue() - (intValue + intValue3)), 0);
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public boolean containsMessage(Predicate<Message> predicate) {
        return ((Boolean) withJedis(jedisCommands -> {
            String str = "0";
            boolean z = false;
            while (!z) {
                ScanResult hscan = jedisCommands.hscan(this.messagesKey, str);
                Iterator it = hscan.getResult().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry entry = (Map.Entry) it.next();
                    try {
                    } catch (Exception e) {
                        log.error("Failed to read message {}", entry.getKey(), e);
                    }
                    if (predicate.test((Message) JacksonUtil.parseJsonString((String) entry.getValue(), Message.class))) {
                        z = true;
                        break;
                    }
                }
                str = hscan.getCursor();
                if (str.equals("0")) {
                    break;
                }
            }
            return Boolean.valueOf(z);
        })).booleanValue();
    }

    protected void queueMessage(Message message, TemporalAmount temporalAmount) {
        TemporalAmount temporalAmount2 = temporalAmount != null ? temporalAmount : Duration.ZERO;
        String latest = fingerprint(message).getLatest();
        if (((AttemptsAttribute) message.getAttribute(AttemptsAttribute.class)) == null) {
            message.setAttribute(new AttemptsAttribute(0));
        }
        try {
            multi(transaction -> {
                transaction.hset(this.messagesKey, latest, SerDerUtil.serializeAsJsonString(this.mapper, message));
                transaction.zadd(this.queueKey, score(temporalAmount2), latest);
            });
        } catch (Exception e) {
            throw new RuntimeException("Failed to queue message: " + String.valueOf(message), e);
        }
    }

    protected void requeueMessage(String str) {
        multi(transaction -> {
            transaction.zrem(this.unackedKey, new String[]{str});
            transaction.zadd(this.queueKey, score(), str);
        });
    }

    protected void removeMessage(String str) {
        multi(transaction -> {
            transaction.zrem(this.queueKey, new String[]{str});
            transaction.zrem(this.unackedKey, new String[]{str});
            transaction.hdel(this.messagesKey, new String[]{str});
            transaction.del(this.locksKey + ":" + str);
            transaction.hdel(this.attemptsKey, new String[]{str});
        });
    }

    protected void readMessageWithoutLock(String str, Consumer<Message> consumer) {
        withJedis(jedisCommands -> {
            try {
                String hget = jedisCommands.hget(this.messagesKey, str);
                if (hget != null) {
                    consumer.accept((Message) SerDerUtil.deserializeFromJsonString(this.mapper, hget, Message.class));
                }
            } catch (JsonProcessingException e) {
                log.error("Payload for unacked message {} is missing or corrupt", str, e);
                removeMessage(str);
            } catch (Exception e2) {
                log.error("Failed to read unacked message {}, requeuing...", str, e2);
                jedisCommands.hincrBy(this.attemptsKey, str, 1L);
                requeueMessage(str);
            }
        });
    }

    protected Triple<String, Instant, String> readMessageWithLock() {
        return (Triple) withJedis(jedisCommands -> {
            try {
                Object evalsha = jedisCommands.evalsha(this.readMessageWithLockScriptSha, Arrays.asList(this.queueKey, this.unackedKey, this.locksKey, this.messagesKey), Arrays.asList(String.valueOf(score()), "10", String.valueOf(this.lockTtlSeconds), String.format(Locale.US, "%f", Double.valueOf(score(getAckTimeout()))), String.format(Locale.US, "%f", Double.valueOf(score()))));
                if (evalsha instanceof List) {
                    List list = (List) evalsha;
                    if (list.size() >= 3) {
                        return Triple.of(list.get(0).toString(), Instant.ofEpochMilli(Long.parseLong(list.get(1).toString())), list.get(2) != null ? list.get(2).toString() : null);
                    }
                }
                if (Objects.equals(evalsha, "ReadLockFailed")) {
                    fire(QueueEvent.LockFailed);
                }
                return null;
            } catch (JedisDataException e) {
                if (e.getMessage() == null || !e.getMessage().startsWith("NOSCRIPT")) {
                    throw e;
                }
                cacheScript();
                return readMessageWithLock();
            }
        });
    }

    protected void readMessage(String str, String str2, Consumer<Message> consumer) {
        withJedis(jedisCommands -> {
            if (str2 == null) {
                log.error("Payload for message {} is missing", str);
                removeMessage(str);
                return;
            }
            try {
                Message message = (Message) SerDerUtil.deserializeFromJsonString(this.mapper, str2, Message.class);
                AttemptsAttribute attemptsAttribute = (AttemptsAttribute) message.getAttribute(AttemptsAttribute.class);
                if (attemptsAttribute == null) {
                    attemptsAttribute = new AttemptsAttribute(0);
                }
                message.setAttribute(new AttemptsAttribute(attemptsAttribute.getAttempts() + 1));
                jedisCommands.hset(this.messagesKey, str, this.mapper.writeValueAsString(message));
                consumer.accept(message);
            } catch (IOException e) {
                log.error("Failed to read message {}, requeuing...", str, e);
                jedisCommands.hincrBy(this.attemptsKey, str, 1L);
                requeueMessage(str);
            }
        });
    }

    private void ackMessage(String str) {
        withJedis(jedisCommands -> {
            if (zismember(jedisCommands, this.queueKey, str)) {
                multi(transaction -> {
                    transaction.zrem(this.unackedKey, new String[]{str});
                    transaction.del(this.locksKey + ":" + str);
                });
            } else {
                removeMessage(str);
            }
            fire(QueueEvent.MessageAcknowledged);
        });
    }
}
