package io.pythagoras.messagebus.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.pythagoras.messagebus.core.config.MessageBusProperties;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:io/pythagoras/messagebus/core/MessageBusService.class */
public class MessageBusService implements IMessageBus {
    private static final Logger logger = LoggerFactory.getLogger(MessageBusService.class);
    private MessageBusAdapterProvider messageBusAdapterProvider;
    private MessageContractProvider messageContractProvider;
    private MessageFactory messageFactory;
    private MessageHandlerProvider messageHandlerProvider;
    private ObjectMapper mapper = new ObjectMapper();
    private ExecutorService executorService;
    private MessageBusProperties properties;

    @Autowired
    public MessageBusService(MessageBusAdapterProvider messageBusAdapterProvider, MessageContractProvider messageContractProvider, MessageHandlerProvider messageHandlerProvider, MessageFactory messageFactory, MessageBusProperties messageBusProperties) {
        this.messageBusAdapterProvider = messageBusAdapterProvider;
        this.messageHandlerProvider = messageHandlerProvider;
        this.messageContractProvider = messageContractProvider;
        this.messageFactory = messageFactory;
        this.properties = messageBusProperties;
    }

    @PostConstruct
    public void init() {
        if (this.properties.isEnabled()) {
            this.messageBusAdapterProvider.getAdapter().initialize(this.messageContractProvider.getCodeList(), this.messageHandlerProvider.getHandlerContractCodes(), this.messageContractProvider.getCodeListWithMessageContracts());
            if (this.properties.isReceiveEnabled()) {
                this.messageBusAdapterProvider.getAdapter().registerMessageBusService(this);
                this.messageBusAdapterProvider.getAdapter().start();
            }
            if (this.properties.isSendEnabled()) {
                this.executorService = Executors.newFixedThreadPool(3);
            }
        }
    }

    @Override // io.pythagoras.messagebus.core.IMessageBus
    public void sendMessage(IMessageContract iMessageContract) throws MessageSendingException {
        if (!this.properties.isEnabled()) {
            logger.warn("MessageBus is disabled.");
        } else if (this.properties.isSendEnabled()) {
            this.executorService.submit(() -> {
                BusMessage busMessage = new BusMessage();
                busMessage.setCode(iMessageContract.getCode());
                busMessage.setVersion(iMessageContract.getVersion());
                try {
                    busMessage.setPayload(this.mapper.writeValueAsString(iMessageContract));
                    this.messageBusAdapterProvider.getAdapter().sendMessage(busMessage);
                } catch (JsonProcessingException e) {
                    logger.error("Unable to convert message to json.", e);
                }
            });
        } else {
            logger.warn("MessageBus is not enabled for sending.");
        }
    }

    @Override // io.pythagoras.messagebus.core.IMessageBus
    public void receiveMessage(IBusMessage iBusMessage) throws HandleMessageFailureException {
        if (!this.properties.isEnabled()) {
            throw new HandleMessageFailureException("MessageBus is disabled.");
        }
        if (!this.properties.isReceiveEnabled()) {
            throw new HandleMessageFailureException("MessageBus receiving is not enabled.");
        }
        try {
            IMessageContract convertFromBusMessage = convertFromBusMessage(iBusMessage);
            for (IMessageHandler iMessageHandler : this.messageHandlerProvider.getMessageHandlersForCodeAndVersion(convertFromBusMessage.getCode(), convertFromBusMessage.getVersion())) {
                try {
                    iMessageHandler.handleMessage(convertFromBusMessage);
                    if (IMessageHandler2.class.isInstance(iMessageHandler)) {
                        IMessageHandler2 iMessageHandler2 = (IMessageHandler2) iMessageHandler;
                        iMessageHandler2.handleMessage(convertFromBusMessage, iBusMessage.getPayload(), iBusMessage.getSentTime());
                        iMessageHandler2.handleMessage(convertFromBusMessage, iBusMessage.getSentTime());
                        iMessageHandler2.handleRawMessage(iBusMessage.getPayload(), iBusMessage.getSentTime());
                    }
                } catch (Exception e) {
                    logger.error("Message Handling Error: " + e.getMessage(), e);
                    throw new HandleMessageFailureException("Message Handling Error: " + e.getMessage(), e);
                }
            }
        } catch (Exception e2) {
            logger.error("Message Unwrapping Error: " + e2.getMessage(), e2, iBusMessage);
            throw new HandleMessageFailureException("Message Unwrapping Error: " + e2.getMessage(), e2);
        }
    }

    private IMessageContract convertFromBusMessage(IBusMessage iBusMessage) {
        try {
            return (IMessageContract) this.mapper.readValue(iBusMessage.getPayload(), this.messageFactory.make(this.messageContractProvider.get(iBusMessage.getCode(), iBusMessage.getVersion())).getClass());
        } catch (IOException e) {
            logger.error("Unable to convert json to message.", e, iBusMessage.getPayload());
            return null;
        }
    }

    @Override // io.pythagoras.messagebus.core.IMessageBus
    public void disableMessageBus() throws MessageBusStateException {
        if (this.properties.isEnabled()) {
            this.properties.setSendEnabled(false);
            this.messageBusAdapterProvider.getAdapter().stop();
            this.properties.setReceiveEnabled(false);
            this.properties.setEnabled(false);
        }
    }

    @Override // io.pythagoras.messagebus.core.IMessageBus
    public void enableMessageBus() throws MessageBusStateException {
        this.properties.setEnabled(true);
        this.properties.setReceiveEnabled(true);
        this.properties.setSendEnabled(true);
        try {
            init();
        } catch (MessageBusInitializationException e) {
            throw new MessageBusStateException("Unable to enable message bus.", e);
        }
    }

    @PreDestroy
    public void preDestroy() {
        this.messageBusAdapterProvider.getAdapter().cleanResources();
    }
}
