package io.datarouter.joblet.job;

import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.job.BaseJob;
import io.datarouter.joblet.enums.JobletPriority;
import io.datarouter.joblet.enums.JobletStatus;
import io.datarouter.joblet.queue.JobletRequestQueueManager;
import io.datarouter.joblet.storage.jobletrequest.DatarouterJobletRequestDao;
import io.datarouter.joblet.storage.jobletrequest.JobletRequest;
import io.datarouter.joblet.storage.jobletrequest.JobletRequestKey;
import io.datarouter.joblet.storage.jobletrequestqueue.DatarouterJobletQueueDao;
import io.datarouter.joblet.storage.jobletrequestqueue.JobletRequestQueueKey;
import io.datarouter.joblet.type.ActiveJobletTypeFactory;
import io.datarouter.util.tuple.Range;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/joblet/job/JobletRequeueJob.class */
public class JobletRequeueJob extends BaseJob {
    private static final Logger logger = LoggerFactory.getLogger(JobletRequeueJob.class);
    private static final Duration MIDDLE_AGE = Duration.ofMinutes(30);
    private static final Duration OLD_AGE = Duration.ofHours(1);
    private static final int MAX_REQUEUES_PER_QUEUE = 10;

    @Inject
    private ActiveJobletTypeFactory activeJobletTypeFactory;

    @Inject
    private DatarouterJobletRequestDao jobletRequestDao;

    @Inject
    private JobletRequestQueueManager jobletRequestQueueManager;

    @Inject
    private DatarouterJobletQueueDao jobletQueueDao;

    public void run(TaskTracker taskTracker) {
        JobletRequestKey.prefixesForTypesAndPriorities(this.activeJobletTypeFactory.getAllActiveTypes(), EnumSet.allOf(JobletPriority.class)).advanceUntil(jobletRequestKey -> {
            return taskTracker.shouldStop();
        }).exclude(this::anyExistWithMediumAge).forEach(this::requeueOld);
    }

    private boolean anyExistWithMediumAge(JobletRequestKey jobletRequestKey) {
        return this.jobletRequestDao.scan(new Range<>(jobletRequestKey.copy().withCreated(Long.valueOf(Instant.now().minus((TemporalAmount) OLD_AGE).toEpochMilli())), jobletRequestKey.copy().withCreated(Long.valueOf(Instant.now().minus((TemporalAmount) MIDDLE_AGE).toEpochMilli())))).include(jobletRequest -> {
            return jobletRequest.getStatus() == JobletStatus.CREATED;
        }).hasAny();
    }

    private void requeueOld(JobletRequestKey jobletRequestKey) {
        this.jobletRequestDao.scanWithPrefix(jobletRequestKey).include(jobletRequest -> {
            return jobletRequest.getStatus() == JobletStatus.CREATED;
        }).advanceWhile(JobletRequeueJob::isOld).limit(10L).forEach(this::requeue);
    }

    private void requeue(JobletRequest jobletRequest) {
        JobletRequestQueueKey queueKey = this.jobletRequestQueueManager.getQueueKey(jobletRequest);
        JobletRequestKey copy = jobletRequest.getKey().copy();
        jobletRequest.getKey().setCreated(Long.valueOf(System.currentTimeMillis()));
        this.jobletRequestDao.put(jobletRequest);
        this.jobletQueueDao.put(queueKey, jobletRequest);
        this.jobletRequestDao.delete(copy);
        logger.warn("requeued one oldKey={} newKey={}", copy, jobletRequest.getKey());
    }

    private static boolean isOld(JobletRequest jobletRequest) {
        return jobletRequest.getKey().getAge().compareTo(OLD_AGE) > 0;
    }
}
