package io.moquette.broker.scheduler;

import io.moquette.broker.scheduler.Expirable;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/broker/scheduler/ScheduledExpirationService.class */
public class ScheduledExpirationService<T extends Expirable> {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduledExpirationService.class);
    static final Duration FIRER_TASK_INTERVAL = Duration.ofSeconds(1);
    private final Clock clock;
    private final Consumer<T> action;
    private final DelayQueue<ExpirableTracker<T>> expiringEntities = new DelayQueue<>();
    private final Map<String, ExpirableTracker<T>> expiringEntitiesCache = new HashMap();
    private final ScheduledExecutorService actionsExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ScheduledFuture<?> expiredEntityTask = this.actionsExecutor.scheduleWithFixedDelay(this::checkExpiredEntities, FIRER_TASK_INTERVAL.getSeconds(), FIRER_TASK_INTERVAL.getSeconds(), TimeUnit.SECONDS);

    public ScheduledExpirationService(Clock clock, Consumer<T> consumer) {
        this.clock = clock;
        this.action = consumer;
    }

    private void checkExpiredEntities() {
        ArrayList arrayList = new ArrayList();
        LOG.debug("Retrieved {} expired entity on {}", Integer.valueOf(this.expiringEntities.drainTo(arrayList)), Integer.valueOf(this.expiringEntities.size()));
        arrayList.stream().map((v0) -> {
            return v0.expirable();
        }).forEach(this.action);
    }

    public void track(String str, T t) {
        if (!t.expireAt().isPresent()) {
            throw new RuntimeException("Can't track for expiration an entity without expiry instant, client_id: " + str);
        }
        ExpirableTracker<T> expirableTracker = new ExpirableTracker<>(t, this.clock);
        this.expiringEntities.add((DelayQueue<ExpirableTracker<T>>) expirableTracker);
        this.expiringEntitiesCache.put(str, expirableTracker);
    }

    public boolean untrack(String str) {
        ExpirableTracker<T> expirableTracker = this.expiringEntitiesCache.get(str);
        if (expirableTracker == null) {
            return false;
        }
        return this.expiringEntities.remove(expirableTracker);
    }

    public void shutdown() {
        if (this.expiredEntityTask.cancel(false)) {
            LOG.info("Successfully cancelled expired entities task");
        } else {
            LOG.warn("Can't cancel the execution of expired entities task, was already cancelled? {}, was done? {}", Boolean.valueOf(this.expiredEntityTask.isCancelled()), Boolean.valueOf(this.expiredEntityTask.isDone()));
        }
        this.actionsExecutor.shutdownNow();
    }
}
