package io.datarouter.joblet.execute;

import io.datarouter.bytes.KvString;
import io.datarouter.joblet.JobletCounters;
import io.datarouter.joblet.dto.RunningJoblet;
import io.datarouter.joblet.handler.JobletHandler;
import io.datarouter.joblet.queue.JobletRequestQueueManager;
import io.datarouter.joblet.service.JobletService;
import io.datarouter.joblet.setting.DatarouterJobletSettingRoot;
import io.datarouter.joblet.type.JobletType;
import io.datarouter.util.concurrent.ExecutorServiceTool;
import io.datarouter.util.concurrent.NamedThreadFactory;
import io.datarouter.util.mutable.MutableBoolean;
import io.datarouter.util.number.NumberFormatter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/joblet/execute/JobletProcessor.class */
public class JobletProcessor {
    private static final Logger logger = LoggerFactory.getLogger(JobletProcessor.class);
    private static final Duration SLEEP_WHEN_DISABLED = Duration.ofSeconds(10);
    private static final Duration SLEEP_AFTER_RECENT_QUEUE_MISSES = Duration.ofSeconds(1);
    private static final Duration SLEEP_AFTER_REJECTED_EXECUTION = Duration.ofMillis(100);
    private static final Duration SLEEP_AFTER_EXCEPTION = Duration.ofSeconds(5);
    private static final Duration MAX_WAIT_FOR_SHUTDOWN = Duration.ofSeconds(5);
    public static final Long RUNNING_JOBLET_TIMEOUT_MS = 600000L;
    private final DatarouterJobletSettingRoot jobletSettings;
    private final JobletRequestQueueManager jobletRequestQueueManager;
    private final JobletCallableFactory jobletCallableFactory;
    private final JobletService jobletService;
    private final AtomicLong idGenerator;
    private final JobletType<?> jobletType;
    private final ThreadPoolExecutor exec;
    private final Thread driverThread;
    private final MutableBoolean shutdownRequested = new MutableBoolean(false);
    private final Map<Long, JobletCallable> jobletCallableById = new ConcurrentHashMap();
    private final Map<Long, Future<Void>> jobletFutureById = new ConcurrentHashMap();

    public JobletProcessor(DatarouterJobletSettingRoot datarouterJobletSettingRoot, JobletRequestQueueManager jobletRequestQueueManager, JobletCallableFactory jobletCallableFactory, JobletService jobletService, AtomicLong atomicLong, JobletType<?> jobletType) {
        this.jobletSettings = datarouterJobletSettingRoot;
        this.jobletRequestQueueManager = jobletRequestQueueManager;
        this.jobletCallableFactory = jobletCallableFactory;
        this.jobletService = jobletService;
        this.idGenerator = atomicLong;
        this.jobletType = jobletType;
        this.exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new NamedThreadFactory("joblet-" + jobletType.getPersistentString(), true));
        this.driverThread = new Thread(null, this::run, jobletType.getPersistentString() + " JobletProcessor worker thread");
        this.driverThread.start();
    }

    public void requestShutdown() {
        this.shutdownRequested.set(true);
        this.driverThread.interrupt();
        ExecutorServiceTool.shutdown(this.exec, MAX_WAIT_FOR_SHUTDOWN);
    }

    public boolean killThread(long j) {
        return Optional.ofNullable(this.jobletFutureById.get(Long.valueOf(j))).map(future -> {
            return Boolean.valueOf(future.cancel(true));
        }).isPresent();
    }

    private boolean shouldRun(int i) {
        return ((Boolean) this.jobletSettings.runJoblets.get()).booleanValue() && i > 0;
    }

    public void run() {
        while (true) {
            try {
            } catch (Throwable th) {
                logger.error("", th);
                try {
                    sleepABit("exception", SLEEP_AFTER_EXCEPTION);
                } catch (Exception e) {
                    logger.error("uh oh, problem sleeping", e);
                }
            }
            if (Thread.interrupted()) {
                logger.warn("joblet thread shutting down for type={}", this.jobletType);
                return;
            }
            int threadCount = getThreadCount();
            if (!shouldRun(threadCount)) {
                sleepABit("disabled", SLEEP_WHEN_DISABLED);
            } else if (this.jobletRequestQueueManager.shouldCheckAnyQueues(this.jobletType)) {
                this.exec.setMaximumPoolSize(threadCount);
                tryEnqueueJobletCallable();
            } else {
                sleepABit("recentQueueMisses", SLEEP_AFTER_RECENT_QUEUE_MISSES);
            }
        }
    }

    private void tryEnqueueJobletCallable() {
        try {
            long incrementAndGet = this.idGenerator.incrementAndGet();
            JobletCallable create = this.jobletCallableFactory.create(this.shutdownRequested, this, this.jobletType, incrementAndGet);
            Future<Void> submit = this.exec.submit(create);
            this.jobletCallableById.put(Long.valueOf(incrementAndGet), create);
            this.jobletFutureById.put(Long.valueOf(incrementAndGet), submit);
        } catch (RejectedExecutionException e) {
            JobletCounters.rejectedCallable(this.jobletType);
            sleepABit("rejectedExecution", SLEEP_AFTER_REJECTED_EXECUTION);
        }
    }

    private void sleepABit(String str, Duration duration) {
        logger.debug("sleeping {}", new KvString().add(JobletHandler.PARAM_type, this.jobletType.getPersistentString()).add("millis", Long.valueOf(duration.toMillis()), (v0) -> {
            return NumberFormatter.addCommas(v0);
        }).add("reason", str));
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void onCompletion(Long l) {
        this.jobletCallableById.remove(l);
        this.jobletFutureById.remove(l);
    }

    public List<RunningJoblet> getRunningJoblets() {
        return this.jobletCallableById.values().stream().map((v0) -> {
            return v0.getRunningJoblet();
        }).filter((v0) -> {
            return v0.hasPayload();
        }).toList();
    }

    public int getNumRunningJoblets() {
        return this.jobletCallableById.size();
    }

    public int getThreadCount() {
        return this.jobletService.getThreadCountInfoForThisInstance(this.jobletType).effectiveLimit;
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + String.valueOf(this.jobletType) + ", numThreads:" + String.valueOf(this.jobletSettings.getThreadCountForJobletType(this.jobletType)) + "]";
    }

    public JobletType<?> getJobletType() {
        return this.jobletType;
    }
}
