package com.codeheadsystems.queue.impl;

import com.codeheadsystems.metrics.Metrics;
import com.codeheadsystems.metrics.Tags;
import com.codeheadsystems.queue.Message;
import com.codeheadsystems.queue.Queue;
import com.codeheadsystems.queue.QueueConfiguration;
import com.codeheadsystems.queue.State;
import com.codeheadsystems.queue.dao.MessageDao;
import com.codeheadsystems.queue.factory.MessageFactory;
import com.codeheadsystems.queue.factory.QueueConfigurationFactory;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/codeheadsystems/queue/impl/QueueImpl.class */
public class QueueImpl implements Queue {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueImpl.class);
    private final MessageDao messageDao;
    private final MessageFactory messageFactory;
    private final QueueConfiguration queueConfiguration;
    private final Metrics metrics;

    @Inject
    public QueueImpl(MessageDao messageDao, MessageFactory messageFactory, QueueConfigurationFactory queueConfigurationFactory, Metrics metrics) {
        this.messageDao = messageDao;
        this.messageFactory = messageFactory;
        this.queueConfiguration = queueConfigurationFactory.queueConfiguration();
        this.metrics = metrics;
        LOGGER.info("QueueImpl({}, {},{})", new Object[]{this.queueConfiguration, messageDao, messageFactory});
    }

    @Override // com.codeheadsystems.queue.Queue
    public Optional<Message> enqueue(String str, String str2) {
        LOGGER.trace("enqueue({},{})", str, str2);
        return (Optional) this.metrics.time("QueueImpl.enqueue", Tags.of(new String[]{"messageType", str}), () -> {
            Message createMessage = this.messageFactory.createMessage(str, str2);
            try {
                this.messageDao.store(createMessage, State.PENDING);
                return Optional.of(createMessage);
            } catch (UnableToExecuteStatementException e) {
                if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
                    LOGGER.warn("Message already exists: {}", createMessage);
                    return Optional.of(this.messageDao.readByHash(createMessage.hash()).orElseThrow(() -> {
                        return new IllegalStateException("Message should exist: " + String.valueOf(createMessage));
                    }));
                }
                if (this.queueConfiguration.exceptionOnEnqueueFail()) {
                    LOGGER.error("Unable to store message: {}", createMessage, e);
                    throw e;
                }
                LOGGER.warn("Unable to store message: {}", createMessage, e);
                return Optional.empty();
            }
        });
    }

    @Override // com.codeheadsystems.queue.Queue
    public Optional<State> getState(Message message) {
        LOGGER.trace("getState({})", message);
        return this.messageDao.stateOf(message);
    }

    @Override // com.codeheadsystems.queue.Queue
    public void clearAll() {
        LOGGER.trace("clearAll()");
        this.messageDao.deleteAll();
    }

    @Override // com.codeheadsystems.queue.Queue
    public void clear(Message message) {
        LOGGER.trace("clear({})", message);
        this.messageDao.delete(message);
    }
}
