package org.frankframework.management.gateway;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.topic.ITopic;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import lombok.Generated;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.frankframework.management.bus.BusMessageUtils;
import org.frankframework.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.context.SecurityContextHolderStrategy;

/* loaded from: input_file:org/frankframework/management/gateway/HazelcastInboundGateway.class */
public class HazelcastInboundGateway extends MessagingGatewaySupport {

    @Generated
    private static final Logger log = LogManager.getLogger(HazelcastInboundGateway.class);
    private HazelcastInstance hzInstance;
    private ITopic<Message<?>> requestTopic;

    @Value("${instance.name:}")
    private String instanceName;
    private final SecurityContextHolderStrategy securityContextHolderStrategy = SecurityContextHolder.getContextHolderStrategy();
    private final String requestTopicName = HazelcastConfig.REQUEST_TOPIC_NAME;

    protected void onInit() {
        this.hzInstance = HazelcastConfig.newHazelcastInstance("worker", Map.of(HazelcastConfig.ATTRIBUTE_APPLICATION_KEY, this.instanceName));
        SpringUtils.registerSingleton(getApplicationContext(), "hazelcastInboundInstance", this.hzInstance);
        this.requestTopic = this.hzInstance.getTopic(HazelcastConfig.REQUEST_TOPIC_NAME);
        setRequestChannel(getRequestChannel(getApplicationContext()));
        setErrorChannel(null);
        super.onInit();
        log.debug("created message listener [{}] on topic [{}]", this.requestTopic.addMessageListener(this::handleIncomingMessage), HazelcastConfig.REQUEST_TOPIC_NAME);
    }

    private MessageChannel getRequestChannel(ApplicationContext applicationContext) {
        return (MessageChannel) applicationContext.getBean("frank-management-bus", MessageChannel.class);
    }

    private <E extends Message<?>> void handleIncomingMessage(com.hazelcast.topic.Message<E> message) {
        Message<?> message2 = (Message) message.getMessageObject();
        UUID uuid = this.hzInstance.getLocalEndpoint().getUuid();
        UUID uuid2 = (UUID) message2.getHeaders().get(BusMessageUtils.HEADER_TARGET_KEY, UUID.class);
        UUID id = message2.getHeaders().getId();
        if (uuid2 != null && !uuid2.equals(uuid)) {
            log.trace("skipping message with id [{}] from member [{}]", new Supplier[]{() -> {
                return id;
            }, () -> {
                return message.getPublishingMember().getUuid();
            }});
            return;
        }
        log.trace("received message with id [{}] from member [{}]", new Supplier[]{() -> {
            return id;
        }, () -> {
            return message.getPublishingMember().getUuid();
        }});
        CloseableThreadContext.Instance put = CloseableThreadContext.put("messageId", id.toString());
        try {
            String str = (String) message2.getHeaders().getReplyChannel();
            log.debug("received message [{}] {} reply-channel", message2, str == null ? "without" : "with");
            processMessage(message2, str);
            if (put != null) {
                put.close();
            }
        } catch (Throwable th) {
            if (put != null) {
                try {
                    put.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void processMessage(@Nonnull Message<?> message, @Nullable String str) {
        MessageHeaders headers = message.getHeaders();
        propagateAuthenticationContext(headers);
        try {
            if (str == null) {
                Logger logger = log;
                Objects.requireNonNull(headers);
                logger.trace("processing message id [{}] asynchronous", new Supplier[]{headers::getId});
                super.send(message);
            } else {
                Logger logger2 = log;
                Objects.requireNonNull(headers);
                logger2.trace("processing message id [{}] synchronous", new Supplier[]{headers::getId});
                Message<?> sendAndReceiveMessage = super.sendAndReceiveMessage(message);
                if (sendAndReceiveMessage == null) {
                    log.trace("synchronous message did not return a response");
                    return;
                }
                handleResponse(sendAndReceiveMessage, str);
            }
        } catch (Exception e) {
            log.error("error processing message id [{}]", headers.getId(), e);
        } catch (MessageHandlingException e2) {
            log.warn("error processing message id [{}]", headers.getId(), e2.getCause());
        }
    }

    private void handleResponse(Message<?> message, @Nonnull String str) {
        MessageHeaders headers = message.getHeaders();
        if (message instanceof ErrorMessage) {
            throw ((Throwable) ((ErrorMessage) message).getPayload());
        }
        Object payload = message.getPayload();
        if (payload instanceof InputStream) {
            message = MessageBuilder.withPayload(new SerializableInputStream((InputStream) payload)).copyHeaders(headers).build();
        }
        Logger logger = log;
        Objects.requireNonNull(headers);
        logger.trace("sending response message id [{}] to reply-channel [{}]", new Supplier[]{headers::getId, () -> {
            return str;
        }});
        if (this.hzInstance.getQueue(str).offer(message)) {
            return;
        }
        log.error("unable to send response [{}] to reply-channel [{}]", message, str);
    }

    private void propagateAuthenticationContext(@Nonnull MessageHeaders messageHeaders) {
        Authentication authentication = (Authentication) messageHeaders.get(HazelcastConfig.AUTHENTICATION_HEADER_KEY, Authentication.class);
        SecurityContext createEmptyContext = this.securityContextHolderStrategy.createEmptyContext();
        createEmptyContext.setAuthentication(authentication);
        this.securityContextHolderStrategy.setContext(createEmptyContext);
    }
}
