package com.codeheadsystems.queue.impl;

import com.codeheadsystems.metrics.Metrics;
import com.codeheadsystems.metrics.Tags;
import com.codeheadsystems.queue.Message;
import com.codeheadsystems.queue.MessageConsumer;
import com.codeheadsystems.queue.QueueConfiguration;
import com.codeheadsystems.queue.factory.QueueConfigurationFactory;
import com.codeheadsystems.queue.manager.MessageManager;
import io.dropwizard.lifecycle.Managed;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/codeheadsystems/queue/impl/MessageConsumerExecutor.class */
public class MessageConsumerExecutor implements Managed {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumerExecutor.class);
    private final ThreadPoolExecutor executorService;
    private final MessageManager messageManager;
    private final QueueRegister queueRegister;
    private final Metrics metrics;

    @Inject
    public MessageConsumerExecutor(QueueConfigurationFactory queueConfigurationFactory, MessageManager messageManager, QueueRegister queueRegister, Metrics metrics) {
        QueueConfiguration queueConfiguration = queueConfigurationFactory.queueConfiguration();
        this.executorService = new ThreadPoolExecutor(queueConfiguration.queueExecutorMinThreads(), queueConfiguration.queueExecutorMaxThreads(), queueConfiguration.queueExecutorIdleSeconds(), TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.messageManager = messageManager;
        this.queueRegister = queueRegister;
        this.metrics = metrics;
        LOGGER.info("MessageConsumerExecutor({},{},{})", new Object[]{messageManager, this.executorService, queueRegister});
    }

    public int availableThreadCount() {
        return this.executorService.getMaximumPoolSize() - this.executorService.getActiveCount();
    }

    public void enqueue(Message message) {
        LOGGER.trace("enqueue({})", message);
        String messageType = message.messageType();
        this.metrics.time("MessageConsumerExecutor.enqueue", Tags.of(new String[]{"messageType", messageType}), () -> {
            this.queueRegister.getConsumer(messageType).ifPresentOrElse(messageConsumer -> {
                this.executorService.execute(() -> {
                    execute(message, messageConsumer);
                });
            }, () -> {
                LOGGER.error("No message for type {}", message.messageType());
                this.messageManager.clear(message);
            });
            return null;
        });
    }

    private void execute(Message message, MessageConsumer messageConsumer) {
        LOGGER.trace("execute({},{})", message, messageConsumer);
        try {
            try {
                this.metrics.time("MessageConsumerExecutor.execute", Tags.of(new String[]{"messageType", message.messageType()}), () -> {
                    this.messageManager.setProcessing(message);
                    messageConsumer.accept(message);
                    return null;
                });
                this.messageManager.clear(message);
            } catch (Throwable th) {
                LOGGER.error("Error processing message: {}", message, th);
                this.messageManager.clear(message);
            }
        } catch (Throwable th2) {
            this.messageManager.clear(message);
            throw th2;
        }
    }

    public void start() throws Exception {
        LOGGER.info("Executor service enabled to start executing messages");
    }

    public void stop() throws Exception {
        LOGGER.info("stop()");
        LOGGER.info("Shutting down the executor service");
        this.executorService.shutdown();
        if (!this.executorService.awaitTermination(15L, TimeUnit.SECONDS)) {
            LOGGER.info("Shutting down nicely failed. No longer being nice.");
            this.executorService.shutdownNow().forEach(runnable -> {
                LOGGER.warn("Unable to shutdown {}", runnable);
            });
        }
        LOGGER.info("Executor service no longer executing messages");
    }
}
