package cn.sliew.carp.framework.log.realtime.poll.redis;

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:cn/sliew/carp/framework/log/realtime/poll/redis/RedisStreamFetcher.class */
public class RedisStreamFetcher<T> {
    private final StringRedisTemplate redisTemplate;
    private final Class<T> targetClass;
    private final String streamKey;
    private StreamOffset streamOffset;
    private ArrayBlockingQueue<T> buffer = new ArrayBlockingQueue<>(100);
    private boolean closed;

    public RedisStreamFetcher(@Nonnull StringRedisTemplate stringRedisTemplate, @Nonnull Class<T> cls, @Nonnull String str) {
        this.redisTemplate = (StringRedisTemplate) Objects.requireNonNull(stringRedisTemplate, "redisTemplate");
        this.targetClass = (Class) Objects.requireNonNull(cls, "targetClass");
        this.streamKey = (String) Objects.requireNonNull(str, "streamKey");
        this.streamOffset = StreamOffset.fromStart(str);
    }

    public T next() throws IOException {
        if (this.closed) {
            return null;
        }
        do {
            T poll = this.buffer.poll();
            if (poll != null) {
                return poll;
            }
            List<ObjectRecord> read = this.redisTemplate.opsForStream().read(this.targetClass, new StreamOffset[]{this.streamOffset});
            if (!CollectionUtils.isEmpty(read)) {
                for (ObjectRecord objectRecord : read) {
                    if (!this.buffer.offer(objectRecord.getValue())) {
                        break;
                    }
                    this.streamOffset = StreamOffset.from(objectRecord);
                }
            }
        } while (isStreamExists());
        return null;
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
    }

    private boolean isStreamExists() {
        return this.redisTemplate.hasKey(this.streamKey).booleanValue();
    }
}
