package com.codeheadsystems.queue.impl;

import com.codeheadsystems.metrics.Metrics;
import com.codeheadsystems.queue.QueueConfiguration;
import com.codeheadsystems.queue.factory.QueueConfigurationFactory;
import com.codeheadsystems.queue.manager.MessageManager;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/codeheadsystems/queue/impl/QueueProcessor.class */
public class QueueProcessor implements Managed {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueProcessor.class);
    private final MessageManager messageManager;
    private final QueueConfiguration queueConfiguration;
    private final MessageConsumerExecutor messageConsumerExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Metrics metrics;
    private ScheduledFuture<?> scheduler;

    @Inject
    public QueueProcessor(MessageManager messageManager, QueueConfigurationFactory queueConfigurationFactory, MessageConsumerExecutor messageConsumerExecutor, Metrics metrics) {
        this(messageManager, queueConfigurationFactory, messageConsumerExecutor, Executors.newScheduledThreadPool(1), metrics);
    }

    @VisibleForTesting
    QueueProcessor(MessageManager messageManager, QueueConfigurationFactory queueConfigurationFactory, MessageConsumerExecutor messageConsumerExecutor, ScheduledExecutorService scheduledExecutorService, Metrics metrics) {
        this.messageManager = messageManager;
        this.queueConfiguration = queueConfigurationFactory.queueConfiguration();
        this.messageConsumerExecutor = messageConsumerExecutor;
        this.scheduledExecutorService = scheduledExecutorService;
        this.metrics = metrics;
        LOGGER.info("QueueProcessor({},{},{})", new Object[]{messageManager, this.queueConfiguration, messageConsumerExecutor});
    }

    public void start() {
        LOGGER.info("start()");
        synchronized (this.scheduledExecutorService) {
            if (this.scheduler == null) {
                LOGGER.info("Resetting existing messages to pending state");
                this.messageManager.setAllToPending();
                LOGGER.info("Starting the scheduler");
                this.scheduler = this.scheduledExecutorService.scheduleAtFixedRate(this::processPendingQueue, this.queueConfiguration.queueProcessorInitialDelay(), this.queueConfiguration.queueProcessorInterval(), TimeUnit.SECONDS);
            }
        }
        LOGGER.info("Queue accepting messages");
    }

    public void processPendingQueue() {
        LOGGER.trace("processPendingQueue()");
        int availableThreadCount = this.messageConsumerExecutor.availableThreadCount();
        this.metrics.increment("QueueProcessor.processPendingQueue.availableThreads", availableThreadCount, new String[0]);
        if (availableThreadCount < 1) {
            LOGGER.trace("No threads available to process messages: {}", Integer.valueOf(availableThreadCount));
        } else {
            this.metrics.time("QueueProcessor.processPendingQueue", () -> {
                this.messageManager.getPendingMessages(availableThreadCount).forEach(message -> {
                    LOGGER.trace("Processing message {}", message);
                    this.messageManager.setActivating(message);
                    this.messageConsumerExecutor.enqueue(message);
                });
                return null;
            }, new String[0]);
        }
    }

    public void stop() throws Exception {
        LOGGER.info("stop()");
        synchronized (this.scheduledExecutorService) {
            if (this.scheduler != null) {
                LOGGER.info("Shutting down the scheduler");
                this.scheduler.cancel(true);
                this.scheduler = null;
                LOGGER.info("Shutting down the scheduler service");
                this.scheduledExecutorService.shutdown();
                if (!this.scheduledExecutorService.awaitTermination(15L, TimeUnit.SECONDS)) {
                    LOGGER.info("Shutting down nicely failed. No longer being nice.");
                    this.scheduledExecutorService.shutdownNow();
                }
            }
        }
        LOGGER.info("Queue no longer accepting messages");
    }
}
