package com.codeheadsystems.queue.manager;

import com.codeheadsystems.metrics.Metrics;
import com.codeheadsystems.metrics.Tags;
import com.codeheadsystems.queue.Message;
import com.codeheadsystems.queue.State;
import com.codeheadsystems.queue.dao.MessageDao;
import com.codeheadsystems.queue.factory.MessageFactory;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/codeheadsystems/queue/manager/MessageManager.class */
public class MessageManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageManager.class);
    private final MessageDao dao;
    private final MessageFactory messageFactory;
    private final Metrics metrics;

    @Inject
    public MessageManager(MessageDao messageDao, MessageFactory messageFactory, Metrics metrics) {
        this.dao = messageDao;
        this.messageFactory = messageFactory;
        this.metrics = metrics;
    }

    public Optional<Message> saveMessage(String str, String str2) {
        LOGGER.trace("saveMessage({},{})", str, str2);
        return (Optional) this.metrics.time("MessageManager.saveMessage", Tags.of(new String[]{"messageType", str}), () -> {
            Message createMessage = this.messageFactory.createMessage(str, str2);
            try {
                this.dao.store(createMessage, State.PENDING);
                return Optional.of(createMessage);
            } catch (UnableToExecuteStatementException e) {
                if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
                    LOGGER.warn("Message already exists: {}", createMessage);
                    return Optional.of(this.dao.readByHash(createMessage.hash()).orElseThrow(() -> {
                        return new IllegalStateException("Message should exist: " + String.valueOf(createMessage));
                    }));
                }
                LOGGER.error("Unable to store message: {}", createMessage, e);
                throw e;
            }
        });
    }

    public void setProcessing(Message message) {
        LOGGER.trace("setProcessing({})", message);
        this.dao.updateState(message, State.PROCESSING);
    }

    public void setActivating(Message message) {
        LOGGER.trace("setActivation({})", message);
        this.dao.updateState(message, State.ACTIVATING);
    }

    public void setAllToPending() {
        LOGGER.trace("setAllToPending()");
        this.dao.updateAllToState(State.PENDING);
    }

    public List<Message> getPendingMessages(int i) {
        LOGGER.trace("getPendingMessages({})", Integer.valueOf(i));
        return this.dao.forState(State.PENDING, i);
    }

    public Map<State, Long> counts() {
        LOGGER.trace("counts()");
        return (Map) this.dao.counts().stream().collect(Collectors.toMap((v0) -> {
            return v0.state();
        }, (v0) -> {
            return v0.count();
        }));
    }

    public Optional<State> getState(Message message) {
        LOGGER.trace("getState({})", message);
        return this.dao.stateOf(message);
    }

    public void clearAll() {
        LOGGER.trace("clearAll()");
        this.dao.deleteAll();
    }

    public void clear(Message message) {
        LOGGER.trace("clear({})", message);
        this.dao.delete(message);
    }
}
