package net.smartcosmos.events.kafka;

import net.smartcosmos.events.AbstractSmartCosmosEventTemplate;
import net.smartcosmos.events.SmartCosmosEvent;
import net.smartcosmos.events.SmartCosmosEventException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/smartcosmos/events/kafka/KafkaSmartCosmosEventTemplate.class */
public class KafkaSmartCosmosEventTemplate extends AbstractSmartCosmosEventTemplate {
    private static final Logger log = LoggerFactory.getLogger(KafkaSmartCosmosEventTemplate.class);
    private final KafkaTemplate kafkaTemplate;

    public KafkaSmartCosmosEventTemplate(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void convertAndSend(SmartCosmosEvent<Object> smartCosmosEvent) throws SmartCosmosEventException {
        MessageBuilder header = MessageBuilder.withPayload(smartCosmosEvent).setHeader("kafka_topic", smartCosmosEvent.getEventType());
        if (StringUtils.hasText(smartCosmosEvent.getEventUrn())) {
            header.setHeader("kafka_messageKey", smartCosmosEvent.getEventUrn());
        }
        this.kafkaTemplate.send(header.build()).addCallback(sendResult -> {
            log.info("Event Successfully sent to Kafka topic {}, partition {}", sendResult.getRecordMetadata().topic(), Integer.valueOf(sendResult.getRecordMetadata().partition()));
        }, th -> {
            log.error("Failed to send event to Kafka", th);
        });
    }
}
