package cn.sliew.flinkful.kubernetes.controller.core.inmemory;

import cn.hutool.core.thread.ThreadUtil;
import cn.sliew.carp.framework.spring.lifecycel.AbstractLifecycle;
import cn.sliew.flinkful.kubernetes.controller.core.Controller;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/sliew/flinkful/kubernetes/controller/core/inmemory/SchedulerWorker.class */
public class SchedulerWorker extends AbstractLifecycle {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SchedulerWorker.class);
    private final SchedulerInternalQueue internalQueue;
    private final ControllerInvoker invoker;
    private ScheduledThreadPoolExecutor scheduledExecutor;
    private ScheduledFuture<?> scheduledFuture;
    private Timer wheelTimer;
    private final Map<String, Timeout> controllerMap = new ConcurrentHashMap(8);

    protected void doStart() throws Exception {
        this.wheelTimer = new HashedWheelTimer(new DefaultThreadFactory("scheduler-worker", true), 1L, TimeUnit.SECONDS, 512);
        this.scheduledExecutor = ThreadUtil.createScheduledExecutor(1);
        this.scheduledFuture = this.scheduledExecutor.scheduleWithFixedDelay(this::run, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    protected void doStop() throws Exception {
        this.scheduledFuture.cancel(true);
        this.scheduledExecutor.shutdown();
        this.controllerMap.forEach((str, timeout) -> {
            timeout.cancel();
        });
        this.controllerMap.clear();
        this.wheelTimer.stop();
    }

    private void run() {
        try {
            Thread.currentThread().setName("scheduler-server");
            Controller poll = this.internalQueue.poll();
            if (poll != null) {
                dispatch(poll);
            }
        } catch (Exception e) {
            log.error("Scheduler Worker error", e);
        }
    }

    public void dispatch(Controller controller) {
        if (!isRunning()) {
            log.warn("Scheduler Worker is not started, can't add controller.");
            return;
        }
        this.controllerMap.put(controller.getId(), this.wheelTimer.newTimeout(new WheelTimerTask(controller, this.invoker), controller.getInterval().toSeconds(), TimeUnit.SECONDS));
    }

    @Generated
    public SchedulerWorker(SchedulerInternalQueue schedulerInternalQueue, ControllerInvoker controllerInvoker) {
        this.internalQueue = schedulerInternalQueue;
        this.invoker = controllerInvoker;
    }
}
