package fi.evolver.basics.spring.messaging;

import fi.evolver.basics.spring.log.LogUtils;
import fi.evolver.basics.spring.log.MessageLogService;
import fi.evolver.basics.spring.log.entity.MessageLog;
import fi.evolver.basics.spring.messaging.entity.Message;
import fi.evolver.basics.spring.messaging.model.MessageDetails;
import fi.evolver.basics.spring.messaging.model.PendingMessageDetails;
import fi.evolver.basics.spring.messaging.sender.Sender;
import fi.evolver.basics.spring.messaging.util.SendUtils;
import fi.evolver.basics.spring.status.model.ComponentStatus;
import fi.evolver.basics.spring.status.model.Reportable;
import fi.evolver.basics.spring.util.MessageChainUtils;
import fi.evolver.utils.CommunicationException;
import fi.evolver.utils.NullSafetyUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:fi/evolver/basics/spring/messaging/MessageSender.class */
public class MessageSender implements IMessageSender, Reportable {
    private static final String PROPERTY_FORWARD_POLICY = "ForwardPolicy";
    private static final String DISABLED_PREFIX = "DISABLED:";
    private static final String DISABLED_URI = "disabled://uri";
    private final MessageLogService messageLogService;
    private final MessageRepository messageRepository;
    private final MessagingService messagingService;
    private final Map<String, Sender> senders;
    private static final Logger LOG = LoggerFactory.getLogger(MessageSender.class);
    private static long sentMessagesCount = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fi/evolver/basics/spring/messaging/MessageSender$ForwardResponse.class */
    public enum ForwardResponse {
        ALWAYS,
        SUCCESS,
        ERROR,
        NEVER
    }

    @Autowired
    public MessageSender(MessageLogService messageLogService, MessageRepository messageRepository, MessagingService messagingService, List<Sender> list) {
        this.messageLogService = messageLogService;
        this.messageRepository = messageRepository;
        this.messagingService = messagingService;
        TreeMap treeMap = new TreeMap(String.CASE_INSENSITIVE_ORDER);
        for (Sender sender : list) {
            Iterator<String> it = sender.getSupportedProtocols().iterator();
            while (it.hasNext()) {
                treeMap.put(it.next(), sender);
            }
        }
        this.senders = Collections.unmodifiableMap(treeMap);
    }

    @Override // fi.evolver.basics.spring.messaging.IMessageSender
    @Async
    public void sendPendingMessages(long j, String str) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            try {
                try {
                    List<PendingMessageDetails> findPendingMessagesByTarget = this.messageRepository.findPendingMessagesByTarget(j, str);
                    if (findPendingMessagesByTarget.isEmpty()) {
                        if (i > 0) {
                            updateSentCount(i);
                            LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                            return;
                        }
                        return;
                    }
                    for (PendingMessageDetails pendingMessageDetails : findPendingMessagesByTarget) {
                        if (!pendingMessageDetails.isOkToHandle()) {
                            if (i > 0) {
                                updateSentCount(i);
                                LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                                return;
                            }
                            return;
                        }
                        if (!handleMessage(pendingMessageDetails)) {
                            if (i > 0) {
                                updateSentCount(i);
                                LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                                return;
                            }
                            return;
                        }
                        i++;
                        if (System.currentTimeMillis() - currentTimeMillis >= 9000) {
                            if (i > 0) {
                                updateSentCount(i);
                                LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                                return;
                            }
                            return;
                        }
                    }
                } catch (RuntimeException e) {
                    LOG.warn("Failed sending pending messages for target {}, group {}", new Object[]{Long.valueOf(j), str, e});
                    if (i > 0) {
                        updateSentCount(i);
                        LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                        return;
                    }
                    return;
                }
            } catch (Throwable th) {
                if (i > 0) {
                    updateSentCount(i);
                    LOG.info("Sent {} messages for target {}, group {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), str});
                }
                throw th;
            }
        }
    }

    private static synchronized void updateSentCount(int i) {
        sentMessagesCount += i;
    }

    private boolean handleMessage(PendingMessageDetails pendingMessageDetails) {
        if (!pendingMessageDetails.isOkToHandle()) {
            return false;
        }
        Optional empty = Optional.empty();
        Message.MessageState messageState = Message.MessageState.FAILED;
        try {
            try {
                MessageChainUtils.MessageChain startMessageChain = MessageChainUtils.startMessageChain(Long.valueOf(pendingMessageDetails.getMessageChainId()));
                try {
                    Optional<Message> fetchForHandling = this.messageRepository.fetchForHandling(pendingMessageDetails);
                    if (!fetchForHandling.isPresent()) {
                        LOG.debug("Not sending message {}, another sender got to it first", Long.valueOf(pendingMessageDetails.getId()));
                        if (startMessageChain != null) {
                            startMessageChain.close();
                        }
                        if (fetchForHandling.isPresent()) {
                            this.messageRepository.updateState(fetchForHandling.get(), messageState);
                        }
                        return false;
                    }
                    LogUtils.Specifier specifier = (LogUtils.Specifier) fetchForHandling.map((v0) -> {
                        return v0.getMetadata();
                    }).map(map -> {
                        return (String) map.get("Specifier");
                    }).map(LogUtils::startSpecifier).orElse(null);
                    try {
                        switch (fetchForHandling.get().getMessageTargetConfig().getState()) {
                            case DISABLED:
                                messageState = Message.MessageState.DISABLED;
                                break;
                            case LOG_ONLY:
                                logMessage(fetchForHandling.get());
                                messageState = Message.MessageState.DISABLED;
                                break;
                            case ENABLED:
                                if (pendingMessageDetails.getState() == Message.MessageState.PENDING) {
                                    if (sendMessage(fetchForHandling.get())) {
                                        messageState = Message.MessageState.SENT;
                                        break;
                                    }
                                } else {
                                    LOG.warn("Failing message {} (chain {}) for not finishing in time", Long.valueOf(pendingMessageDetails.getId()), Long.valueOf(pendingMessageDetails.getMessageChainId()));
                                    break;
                                }
                                break;
                            case PAUSED:
                                if (specifier != null) {
                                    specifier.close();
                                }
                                if (startMessageChain != null) {
                                    startMessageChain.close();
                                }
                                if (fetchForHandling.isPresent()) {
                                    this.messageRepository.updateState(fetchForHandling.get(), messageState);
                                }
                                return false;
                        }
                        if (specifier != null) {
                            specifier.close();
                        }
                        if (startMessageChain != null) {
                            startMessageChain.close();
                        }
                        if (fetchForHandling.isPresent()) {
                            this.messageRepository.updateState(fetchForHandling.get(), messageState);
                        }
                        return messageState != Message.MessageState.FAILED;
                    } catch (Throwable th) {
                        if (specifier != null) {
                            try {
                                specifier.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (startMessageChain != null) {
                        try {
                            startMessageChain.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                if (empty.isPresent()) {
                    this.messageRepository.updateState((Message) empty.get(), messageState);
                }
                throw th5;
            }
        } catch (RuntimeException e) {
            LOG.warn("Could not send message {}", Long.valueOf(pendingMessageDetails.getId()), e);
            if (empty.isPresent()) {
                this.messageRepository.updateState((Message) empty.get(), messageState);
            }
            return false;
        }
    }

    private boolean sendMessage(Message message) {
        try {
            String targetUri = message.getTargetUri();
            if (targetUri.toUpperCase().startsWith(DISABLED_PREFIX)) {
                targetUri = DISABLED_URI;
            }
            URI updateUri = SendUtils.updateUri(targetUri, message);
            Sender sender = this.senders.get(updateUri.getScheme());
            if (sender == null) {
                throw new CommunicationException("Unsupported protocol: %s", new Object[]{updateUri.getScheme()});
            }
            SendResult send = sender.send(message, updateUri);
            forwardResponse(message, send);
            return send.isSuccess();
        } catch (IOException | RuntimeException | URISyntaxException e) {
            LOG.error("SENDING {} message {} to {} (try {} of {}) FAILED", new Object[]{message.getMessageType(), Long.valueOf(message.getId()), message.getTargetSystem(), Integer.valueOf(message.getFailCount().intValue() + 1), Optional.ofNullable(message.getMessageTargetConfig().getRetryLimit()).map(num -> {
                return String.valueOf(num.intValue() + 1);
            }).orElse("unlimited"), e});
            return false;
        }
    }

    private void forwardResponse(Message message, SendResult sendResult) {
        if (shouldForwardResponse(message, sendResult.isSuccess())) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            String str = (String) NullSafetyUtils.denull(new String[]{sendResult.getResponseBody(), ""});
            linkedHashMap.putAll(message.getMetadata());
            linkedHashMap.putAll(sendResult.getResponseDetails());
            linkedHashMap.put("Status", sendResult.isSuccess() ? "SUCCESS" : "FAILURE");
            linkedHashMap.put("FailCount", message.getFailCount().toString());
            linkedHashMap.put("TargetSystem", message.getTargetSystem());
            linkedHashMap.put("RetryLimit", message.getMessageTargetConfig().getRetryLimit().toString());
            this.messagingService.send(message.getMessageType() + "-Response", MessageDetails.create(str, linkedHashMap));
        }
    }

    private static boolean shouldForwardResponse(Message message, boolean z) {
        ForwardResponse inferForwardType = inferForwardType(message);
        return ForwardResponse.ALWAYS == inferForwardType || (z && ForwardResponse.SUCCESS == inferForwardType) || (!z && ForwardResponse.ERROR == inferForwardType);
    }

    private static ForwardResponse inferForwardType(Message message) {
        String str = null;
        try {
            str = (String) message.getMessageTargetConfig().getProperty(PROPERTY_FORWARD_POLICY).map((v0) -> {
                return v0.toUpperCase();
            }).orElse(ForwardResponse.NEVER.name());
            return ForwardResponse.valueOf(str);
        } catch (RuntimeException e) {
            LOG.warn("Invalid forwardType parameter value {}: {}", str, e.getMessage());
            return ForwardResponse.NEVER;
        }
    }

    private void logMessage(Message message) {
        int i = 0;
        try {
            InputStream dataStream = message.getDataStream();
            while (dataStream.read() != -1) {
                try {
                    i++;
                } finally {
                }
            }
            if (dataStream != null) {
                dataStream.close();
            }
        } catch (Exception e) {
            LOG.warn("FAILED counting request size", e);
        }
        this.messageLogService.logZippedMessage(LocalDateTime.now(), message.getMessageType(), "disabled", message.getTargetUri(), this.messageLogService.getApplicationName(), message.getTargetSystem(), MessageLog.Direction.OUTBOUND, i, message.getCompressedData(), (Map<String, ?>) null, 0, (byte[]) null, (Map<String, ?>) null, "DISABLED", "Not sent", SendUtils.mapMetadata(message.getMetadata()));
    }

    @Override // fi.evolver.basics.spring.status.model.Reportable
    public List<ComponentStatus> getStatus() {
        return Collections.singletonList(new ComponentStatus("Bean", getClass().getSimpleName(), Collections.singletonMap("MessagesSent", Long.valueOf(sentMessagesCount))));
    }
}
