package de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.impl;

import de.ikor.sip.foundation.testkit.util.TestKitHelper;
import de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.RouteInvoker;
import de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.exceptions.RouteInvokerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;

@ConditionalOnClass({KafkaComponent.class})
@Component
/* loaded from: input_file:de/ikor/sip/foundation/testkit/workflow/whenphase/routeinvoker/impl/KafkaRouteInvoker.class */
public class KafkaRouteInvoker implements RouteInvoker {
    private final CamelContext camelContext;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaRouteInvoker.class);
    private static final List<String> kafkaSpecificHeaderKeys = List.of((Object[]) new String[]{"kafka.PARTITION_KEY", "kafka.PARTITION", "kafka.KEY", "kafka.TOPIC", "kafka.OVERRIDE_TOPIC", "kafka.OFFSET", "kafka.HEADERS", "kafka.LAST_RECORD_BEFORE_COMMIT", "kafka.LAST_POLL_RECORD", "kafka.TIMESTAMP", "kafka.OVERRIDE_TIMESTAMP"});

    @Override // de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.RouteInvoker
    public Optional<Exchange> invoke(Exchange exchange) {
        KafkaConsumer resolveConsumer = TestKitHelper.resolveConsumer(exchange, this.camelContext);
        Exchange createExchange = resolveConsumer.createExchange(false);
        createExchange.getMessage().setBody(exchange.getMessage().getBody());
        createExchange.getMessage().setHeaders(createExchangeHeaders(exchange.getMessage().getHeaders(), resolveConsumer.getEndpoint().getConfiguration()));
        try {
            resolveConsumer.getProcessor().process(createExchange);
            return Optional.empty();
        } catch (Exception e) {
            throw new RouteInvokerException(getClass().getName());
        }
    }

    @Override // de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.RouteInvoker
    public boolean isApplicable(Endpoint endpoint) {
        return endpoint instanceof KafkaEndpoint;
    }

    @Override // de.ikor.sip.foundation.testkit.workflow.whenphase.routeinvoker.RouteInvoker
    public boolean shouldSuspend(Endpoint endpoint) {
        return true;
    }

    private Map<String, Object> createExchangeHeaders(Map<String, Object> map, KafkaConfiguration kafkaConfiguration) {
        Map<String, Object> prepareKafkaSpecificHeaders = prepareKafkaSpecificHeaders(map, kafkaConfiguration.getTopic());
        prepareKafkaSpecificHeaders.putAll(prepareCustomHeaders(map));
        return prepareKafkaSpecificHeaders;
    }

    private Map<String, Object> prepareKafkaSpecificHeaders(Map<String, Object> map, String str) {
        Map<String, Object> map2 = (Map) map.entrySet().stream().filter(entry -> {
            return isKafkaSpecificHeader((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        map2.putIfAbsent("kafka.TOPIC", str);
        map2.putIfAbsent("kafka.TIMESTAMP", Long.valueOf(System.currentTimeMillis()));
        map2.putIfAbsent("CamelMessageTimestamp", map2.get("kafka.TIMESTAMP"));
        return map2;
    }

    private Map<String, Object> prepareCustomHeaders(Map<String, Object> map) {
        return serializeValuesAndAddKafkaSpecificHeader((Map) map.entrySet().stream().filter(entry -> {
            return !isKafkaSpecificHeader((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private boolean isKafkaSpecificHeader(String str) {
        return kafkaSpecificHeaderKeys.contains(str);
    }

    private Map<String, Object> serializeValuesAndAddKafkaSpecificHeader(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        RecordHeaders recordHeaders = new RecordHeaders(new ArrayList());
        DefaultKafkaHeaderSerializer defaultKafkaHeaderSerializer = new DefaultKafkaHeaderSerializer();
        map.forEach((str, obj) -> {
            if (TestKitHelper.isTestKitHeader(str)) {
                hashMap.put(str, obj);
                return;
            }
            byte[] serialize = defaultKafkaHeaderSerializer.serialize(str, obj);
            recordHeaders.add(str, serialize);
            hashMap.put(str, serialize);
        });
        hashMap.put("kafka.HEADERS", recordHeaders);
        return hashMap;
    }

    @Generated
    public KafkaRouteInvoker(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
}
