package org.birchframework.bridge;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnectionFactory;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.LambdaRouteBuilder;
import org.apache.camel.builder.ProcessClause;
import org.apache.camel.model.FilterDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.commons.lang3.StringUtils;
import org.birchframework.bridge.dataformat.PayloadDataFormat;
import org.birchframework.configuration.BirchProperties;
import org.birchframework.configuration.ConfigurationException;
import org.birchframework.dto.BirchErrorCode;
import org.birchframework.dto.payload.DestinationType;
import org.birchframework.framework.beans.Beans;
import org.birchframework.framework.metric.RateGauge;

/* loaded from: input_file:org/birchframework/bridge/JMSToKafkaBridgeFactory.class */
public class JMSToKafkaBridgeFactory extends AbstractBridgeFactory {
    private final SpringCamelContext context;
    private final MeterRegistry meterRegistry;

    public JMSToKafkaBridgeFactory(SpringCamelContext springCamelContext, MeterRegistry meterRegistry) {
        super(BirchProperties.BridgeProperties.BridgeSource.JMS);
        this.context = springCamelContext;
        this.meterRegistry = meterRegistry;
    }

    @Override // org.birchframework.bridge.AbstractBridgeFactory
    public LambdaRouteBuilder createBridge(String str, BirchProperties.BridgeProperties bridgeProperties, BirchProperties.BridgesGlobalConfigs bridgesGlobalConfigs) throws Exception {
        if (StringUtils.isNotBlank(bridgeProperties.getJms().getQueue()) && bridgeProperties.getJms().getQueue().equals(bridgeProperties.getJms().getDeadLetterQueue())) {
            throw new ConfigurationException(BirchErrorCode.B31031);
        }
        String str2 = (String) Arrays.stream(this.context.getApplicationContext().getBeanNamesForType(QueueConnectionFactory.class)).findFirst().orElse(null);
        boolean z = bridgeProperties.getJms().destination().getType() == DestinationType.TOPIC;
        String str3 = z ? (String) Arrays.stream(this.context.getApplicationContext().getBeanNamesForType(TopicConnectionFactory.class)).findFirst().orElse(null) : str2;
        RateGauge registerGauge = registerGauge(bridgeProperties, str, String.format("%s.rate", AbstractBridgeFactory.METRIC_PREFIX), "Rate of incoming messages received per second, since last sampling", this.meterRegistry, Tag.of("state", "received"));
        RateGauge registerGauge2 = registerGauge(bridgeProperties, str, String.format("%s.rate", AbstractBridgeFactory.METRIC_PREFIX), "Rate of outgoing messages sent per second, since last sampling", this.meterRegistry, Tag.of("state", "sent"));
        RateGauge registerGauge3 = registerGauge(bridgeProperties, str, String.format("%s.rate", AbstractBridgeFactory.METRIC_PREFIX), "Rate of errors per second, since last sampling", this.meterRegistry, Tag.of("state", "error"));
        Predicate predicate = (Predicate) Beans.findBeanOrCreateInstance(bridgeProperties.getFilterPredicate());
        Consumer consumer = (Consumer) Beans.findBeanOrCreateInstance(bridgeProperties.getAfterReceiveConsumer());
        Consumer consumer2 = (Consumer) Beans.findBeanOrCreateInstance(bridgeProperties.getBeforeSendConsumer());
        Consumer<Exchange> consumer3 = (Consumer) Beans.findBeanOrCreateInstance(bridgeProperties.getErrorConsumer());
        String policyBeanName = z ? TransactedPolicyType.TOPIC.getPolicyBeanName() : TransactedPolicyType.QUEUE.getPolicyBeanName();
        String deadLetterQueue = bridgeProperties.getJms().getDeadLetterQueue();
        ErrorHandlerBuilder errorHandlerBuilder = errorHandlerBuilder(bridgesGlobalConfigs, bridgeProperties.isTransacted() ? (SpringTransactionPolicy) this.context.getApplicationContext().getBean(policyBeanName, SpringTransactionPolicy.class) : null, registerGauge3, consumer3, StringUtils.isBlank(deadLetterQueue) ? null : () -> {
            return String.format("jms:queue:%s?connectionFactory=%s", deadLetterQueue, str2);
        });
        String propertiesFilterPattern = propertiesFilterPattern(bridgeProperties.getFilterProperties());
        SourceProcessor createSourceProcessor = createSourceProcessor(str, bridgeProperties);
        Supplier supplier = () -> {
            String format = String.format("jms:%s:%s?connectionFactory=%s&acknowledgementModeName=CLIENT_ACKNOWLEDGE&disableReplyTo=true&maxConcurrentConsumers=%d", bridgeProperties.getJms().destination().getDestinationType(), bridgeProperties.getJms().destination().getName(), str3, Integer.valueOf(bridgeProperties.getConcurrentConsumers()));
            if (StringUtils.isNotBlank(bridgeProperties.getJms().getSelector())) {
                format = String.format("%s&selector=%s", format, bridgeProperties.getJms().getSelector());
            }
            return format;
        };
        return routeBuilder -> {
            FilterDefinition filter;
            RouteDefinition errorHandler = routeBuilder.from((String) supplier.get()).routeId(str).autoStartup(bridgesGlobalConfigs.isAutoStart()).errorHandler(errorHandlerBuilder);
            Objects.requireNonNull(errorHandler);
            FilterDefinition filterDefinition = (ProcessorDefinition) Beans.invokeIfNotNull(policyBeanName, errorHandler::transacted, errorHandler);
            if (predicate == null) {
                filter = filterDefinition;
            } else {
                Objects.requireNonNull(predicate);
                filter = filterDefinition.filter((v1) -> {
                    return r1.test(v1);
                });
            }
            ProcessClause process = ((ProcessorDefinition) filter.process().body(obj -> {
                registerGauge.increment();
            })).process();
            Objects.requireNonNull(createSourceProcessor);
            ProcessorDefinition log = ((ProcessorDefinition) process.message(createSourceProcessor::processCorrelationID)).log(LoggingLevel.INFO, "Incoming message: Headers: ${headers}; Body: ${bodyOneLine}");
            Objects.requireNonNull(log);
            ProcessorDefinition processorDefinition = (ProcessorDefinition) Beans.invokeIfNotNull(propertiesFilterPattern, log::removeHeaders, log);
            ProcessClause process2 = processorDefinition.process();
            Objects.requireNonNull(process2);
            ProcessorDefinition custom = ((ProcessorDefinition) Beans.invokeIfNotNull(consumer, process2::exchange, processorDefinition)).process(createSourceProcessor).marshal().custom(PayloadDataFormat.BEAN_NAME);
            ProcessClause process3 = custom.process();
            Objects.requireNonNull(process3);
            ((ProcessorDefinition) ((ProcessorDefinition) Beans.invokeIfNotNull(consumer2, process3::exchange, custom)).log(LoggingLevel.INFO, "Outgoing message: Body: ${bodyOneLine}").toF("kafka:%s", new Object[]{bridgeProperties.getKafka().getTopic()}).process().body(obj2 -> {
                registerGauge2.increment();
            })).stop();
        };
    }

    @Override // org.birchframework.bridge.AbstractBridgeFactory
    protected SourceProcessor createSourceProcessor(String str, BirchProperties.BridgeProperties bridgeProperties) {
        return new JMSSourceProcessor(bridgeProperties);
    }
}
