package cn.sliew.carp.framework.queue.kekio.memory;

import cn.hutool.core.thread.ThreadUtil;
import cn.sliew.carp.framework.common.util.UUIDUtil;
import cn.sliew.carp.framework.queue.kekio.AbstractQueue;
import cn.sliew.carp.framework.queue.kekio.MessageHandler;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.carp.framework.queue.kekio.metrics.QueueEvent;
import io.micrometer.core.instrument.MeterRegistry;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:cn/sliew/carp/framework/queue/kekio/memory/InMemoryQueue.class */
public class InMemoryQueue extends AbstractQueue implements MonitorableQueue, InitializingBean, DisposableBean {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(InMemoryQueue.class);
    private ScheduledThreadPoolExecutor scheduledExecutor;
    private final DelayQueue<Envelope> queue;
    private final DelayQueue<Envelope> unacked;

    /* loaded from: input_file:cn/sliew/carp/framework/queue/kekio/memory/InMemoryQueue$Envelope.class */
    public static class Envelope implements Delayed {
        private final String id;
        private final Message payload;
        private final Instant scheduledTime;
        private final int count;

        public Envelope(Message message, Instant instant) {
            this(UUIDUtil.randomUUId(), message, instant, 1);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), delayed.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return Instant.now().until(this.scheduledTime, timeUnit.toChronoUnit());
        }

        @Generated
        public String getId() {
            return this.id;
        }

        @Generated
        public Message getPayload() {
            return this.payload;
        }

        @Generated
        public Instant getScheduledTime() {
            return this.scheduledTime;
        }

        @Generated
        public int getCount() {
            return this.count;
        }

        @Generated
        @ConstructorProperties({"id", "payload", "scheduledTime", "count"})
        public Envelope(String str, Message message, Instant instant, int i) {
            this.id = str;
            this.payload = message;
            this.scheduledTime = instant;
            this.count = i;
        }
    }

    public InMemoryQueue(String str, QueueExecutor queueExecutor, Collection<MessageHandler> collection, List<Queue.DeadMessageCallback> list, EventPublisher eventPublisher, MeterRegistry meterRegistry, Boolean bool, Duration duration, Duration duration2, Boolean bool2, TemporalAmount temporalAmount) {
        super(str, queueExecutor, collection, list, eventPublisher, meterRegistry, bool, duration, duration2, bool2, temporalAmount);
        this.queue = new DelayQueue<>();
        this.unacked = new DelayQueue<>();
    }

    public void afterPropertiesSet() throws Exception {
        this.scheduledExecutor = ThreadUtil.createScheduledExecutor(1);
        ThreadUtil.schedule(this.scheduledExecutor, () -> {
            retry();
        }, 0L, 10000L, false);
        log.debug("Start process queue retry: {}", this.queue.getClass().getSimpleName());
    }

    public void destroy() throws Exception {
        if (Objects.nonNull(this.scheduledExecutor)) {
            this.scheduledExecutor.shutdown();
            log.info("Stop process queue poll: {}", this.queue.getClass().getSimpleName());
        }
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void poll(Queue.QueueCallback queueCallback) {
        fire(QueueEvent.QueuePolled);
        Envelope poll = this.queue.poll();
        if (poll != null) {
            TemporalAmount ofMillis = poll.getPayload().getAckTimeoutMs() == null ? this.ackTimeout : Duration.ofMillis(poll.getPayload().getAckTimeoutMs().longValue());
            if (this.unacked.stream().anyMatch(envelope -> {
                return Objects.equals(envelope.getPayload(), poll.getPayload());
            })) {
                this.queue.put((DelayQueue<Envelope>) poll);
                return;
            }
            this.unacked.put((DelayQueue<Envelope>) new Envelope(poll.getId(), poll.getPayload(), Instant.now().plus(ofMillis), poll.getCount()));
            fire(new QueueEvent.MessageProcessing(poll.getPayload(), poll.getScheduledTime(), Instant.now()));
            queueCallback.accept(poll.getPayload(), () -> {
                ack(poll.getId());
                fire(QueueEvent.MessageAcknowledged);
            });
        }
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void poll(int i, Queue.QueueCallback queueCallback) {
        poll(queueCallback);
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void push(Message message, TemporalAmount temporalAmount) {
        boolean removeIf = this.queue.removeIf(envelope -> {
            return Objects.equals(envelope.getPayload(), message);
        });
        this.queue.put((DelayQueue<Envelope>) new Envelope(message, Instant.now().plus(temporalAmount)));
        if (removeIf) {
            fire(new QueueEvent.MessageDuplicate(message));
        } else {
            fire(new QueueEvent.MessagePushed(message));
        }
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void retry() {
        Instant now = Instant.now();
        fire(QueueEvent.RetryPolled);
        while (true) {
            Envelope poll = this.unacked.poll();
            if (poll == null) {
                return;
            }
            if (poll.getCount() >= 5) {
                if (CollectionUtils.isNotEmpty(this.deadMessageHandlers)) {
                    Iterator<Queue.DeadMessageCallback> it = this.deadMessageHandlers.iterator();
                    while (it.hasNext()) {
                        it.next().accept(this, poll.getPayload());
                    }
                }
                fire(QueueEvent.MessageDead);
            } else {
                boolean removeIf = this.queue.removeIf(envelope -> {
                    return Objects.equals(envelope.getPayload(), poll.getPayload());
                });
                log.warn("Redelivering unacked message {}", poll.getPayload());
                this.queue.put((DelayQueue<Envelope>) new Envelope(poll.getId(), poll.getPayload(), now, poll.getCount() + 1));
                if (removeIf) {
                    fire(new QueueEvent.MessageDuplicate(poll.getPayload()));
                } else {
                    fire(QueueEvent.MessageRetried);
                }
            }
        }
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public void clear() {
        this.queue.removeIf(envelope -> {
            return true;
        });
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public TemporalAmount getAckTimeout() {
        return this.ackTimeout;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public List<Queue.DeadMessageCallback> getDeadMessageHandlers() {
        return this.deadMessageHandlers;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public Boolean canPollMany() {
        return this.canPollMany;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public EventPublisher getPublisher() {
        return this.publisher;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public MonitorableQueue.QueueState readState() {
        return new MonitorableQueue.QueueState(this.queue.size(), (int) this.queue.stream().filter(envelope -> {
            return envelope.getDelay(TimeUnit.NANOSECONDS) <= 0;
        }).count(), this.unacked.size());
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public boolean containsMessage(Predicate<Message> predicate) {
        return this.queue.stream().map((v0) -> {
            return v0.getPayload();
        }).anyMatch(predicate);
    }

    private void ack(String str) {
        this.unacked.removeIf(envelope -> {
            return envelope.getId().equals(str);
        });
    }
}
