package cn.sliew.carp.framework.log.realtime.poll;

import cn.hutool.core.thread.ThreadUtil;
import cn.sliew.carp.framework.log.realtime.configuration.RealtimeLogPollProperties;
import cn.sliew.milky.common.concurrent.RunnableWrapper;
import com.google.common.util.concurrent.RateLimiter;
import jakarta.annotation.Nonnull;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.util.CloseableIterator;

/* loaded from: input_file:cn/sliew/carp/framework/log/realtime/poll/StreamPollerImpl.class */
public class StreamPollerImpl implements StreamPoller {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamPollerImpl.class);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final CloseableIterator iterator;
    private final BatchBlockingQueue buffer;
    private final PollTask pollTask;
    private final AsyncTaskExecutor taskExecutor;
    private CompletableFuture pollFuture;

    /* loaded from: input_file:cn/sliew/carp/framework/log/realtime/poll/StreamPollerImpl$PollTask.class */
    private class PollTask implements RunnableWrapper {
        private volatile boolean started = true;
        private final RateLimiter rateLimiter;

        @Nullable
        private volatile Exception exception;

        public PollTask(double d) {
            this.rateLimiter = RateLimiter.create(d);
        }

        public void doRun() throws Exception {
            while (this.started) {
                this.rateLimiter.acquire();
                if (StreamPollerImpl.this.iterator.hasNext()) {
                    StreamPollerImpl.this.buffer.put(StreamPollerImpl.this.iterator.next());
                } else {
                    ThreadUtil.sleep(100L);
                }
            }
        }

        public void onFailure(Exception exc) {
            this.exception = exc;
            StreamPollerImpl.this.pollFuture.completeExceptionally(exc);
        }

        public void close() {
            this.started = false;
        }
    }

    public StreamPollerImpl(@Nonnull CloseableIterator closeableIterator, @Nonnull AsyncTaskExecutor asyncTaskExecutor, @Nonnull RealtimeLogPollProperties realtimeLogPollProperties) {
        this.iterator = (CloseableIterator) Objects.requireNonNull(closeableIterator, "iterator");
        this.buffer = new BatchBlockingQueue(realtimeLogPollProperties.getPollQueueCapacity().intValue());
        this.pollTask = new PollTask(realtimeLogPollProperties.getPollRatePerSecond().doubleValue());
        this.taskExecutor = (AsyncTaskExecutor) Objects.requireNonNull(asyncTaskExecutor, "taskExecutor");
    }

    @Override // cn.sliew.carp.framework.log.realtime.poll.StreamPoller
    public <T> List<T> poll(int i, Duration duration) {
        if (!isRunning()) {
            this.pollFuture = this.taskExecutor.submitCompletable(this.pollTask);
            this.running.compareAndSet(false, true);
        }
        if (this.pollFuture.isCompletedExceptionally()) {
            throw new RuntimeException("Failed to poll result", this.pollTask.exception);
        }
        try {
            return this.buffer.pollBatch(i, duration);
        } catch (InterruptedException e) {
            throw new RuntimeException("Poll thread was interrupted.");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (Objects.nonNull(this.pollFuture) && !this.pollFuture.isDone()) {
            this.pollFuture.cancel(true);
        }
        this.pollTask.close();
        this.iterator.close();
    }

    private boolean isRunning() {
        return this.running.get();
    }
}
