package com.codeheadsystems.queue.impl;

import com.codeheadsystems.metrics.Metrics;
import com.codeheadsystems.queue.QueueConfiguration;
import com.codeheadsystems.queue.State;
import com.codeheadsystems.queue.dao.MessageDao;
import com.codeheadsystems.queue.factory.QueueConfigurationFactory;
import io.dropwizard.lifecycle.Managed;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/codeheadsystems/queue/impl/QueueProcessor.class */
public class QueueProcessor implements Managed {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueProcessor.class);
    private final MessageDao dao;
    private final QueueConfiguration queueConfiguration;
    private final MessageConsumerExecutor messageConsumerExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Metrics metrics;
    private ScheduledFuture<?> scheduler;

    @Inject
    public QueueProcessor(MessageDao messageDao, QueueConfigurationFactory queueConfigurationFactory, MessageConsumerExecutor messageConsumerExecutor, @Named("QueueProcessorScheduler") ScheduledExecutorService scheduledExecutorService, Metrics metrics) {
        this.dao = messageDao;
        this.queueConfiguration = queueConfigurationFactory.queueConfiguration();
        this.messageConsumerExecutor = messageConsumerExecutor;
        this.scheduledExecutorService = scheduledExecutorService;
        this.metrics = metrics;
        LOGGER.info("QueueProcessor({},{},{})", new Object[]{messageDao, this.queueConfiguration, messageConsumerExecutor});
    }

    public void start() {
        LOGGER.info("start()");
        synchronized (this.scheduledExecutorService) {
            if (this.scheduler == null) {
                LOGGER.info("Resetting existing messages to pending state");
                resetState(State.ACTIVATING);
                resetState(State.PROCESSING);
                LOGGER.info("Starting the scheduler");
                this.scheduler = this.scheduledExecutorService.scheduleAtFixedRate(this::processPendingQueue, this.queueConfiguration.queueProcessorInitialDelay(), this.queueConfiguration.queueProcessorInterval(), TimeUnit.SECONDS);
            }
        }
        LOGGER.info("Queue accepting messages");
    }

    private void resetState(State state) {
        this.dao.forState(state).forEach(message -> {
            LOGGER.info("Resetting {} message to PENDING: {}", state, message);
            this.dao.updateState(message, State.PENDING);
        });
    }

    public void processPendingQueue() {
        LOGGER.trace("processPendingQueue()");
        this.metrics.time("QueueProcessor.processPendingQueue", () -> {
            this.dao.forState(State.PENDING).forEach(message -> {
                LOGGER.trace("Processing message {}", message);
                this.dao.updateState(message, State.ACTIVATING);
                this.messageConsumerExecutor.enqueue(message);
            });
            return null;
        }, new String[0]);
    }

    public void stop() throws Exception {
        LOGGER.info("stop()");
        synchronized (this.scheduledExecutorService) {
            if (this.scheduler != null) {
                LOGGER.info("Shutting down the scheduler");
                this.scheduler.cancel(true);
                this.scheduler = null;
                LOGGER.info("Shutting down the scheduler service");
                this.scheduledExecutorService.shutdown();
                if (!this.scheduledExecutorService.awaitTermination(15L, TimeUnit.SECONDS)) {
                    LOGGER.info("Shutting down nicely failed. No longer being nice.");
                    this.scheduledExecutorService.shutdownNow();
                }
            }
        }
        LOGGER.info("Queue no longer accepting messages");
    }
}
