package org.bithon.server.collector.sink.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.OptBoolean;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.bithon.server.common.utils.collection.IteratorableCollection;
import org.bithon.server.tracing.sink.ITraceMessageSink;
import org.bithon.server.tracing.sink.TraceSpan;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

@JsonTypeName("kafka")
/* loaded from: input_file:org/bithon/server/collector/sink/kafka/KafkaTraceSink.class */
public class KafkaTraceSink implements ITraceMessageSink {
    private final KafkaTemplate<String, String> producer;
    private final ObjectMapper objectMapper;

    @JsonCreator
    public KafkaTraceSink(@JsonProperty("props") Map<String, Object> map, @JacksonInject(useInput = OptBoolean.FALSE) ObjectMapper objectMapper) {
        this.producer = new KafkaTemplate<>(new DefaultKafkaProducerFactory(map, new StringSerializer(), new StringSerializer()), ImmutableMap.of("client.id", "trace"));
        this.objectMapper = objectMapper;
    }

    public void process(String str, IteratorableCollection<TraceSpan> iteratorableCollection) {
        if (iteratorableCollection.hasNext()) {
            String str2 = null;
            StringBuilder sb = new StringBuilder();
            while (iteratorableCollection.hasNext()) {
                TraceSpan traceSpan = (TraceSpan) iteratorableCollection.next();
                str2 = traceSpan.getTraceId();
                try {
                    sb.append(this.objectMapper.writeValueAsString(traceSpan));
                } catch (JsonProcessingException e) {
                }
                sb.append('\n');
            }
            ProducerRecord producerRecord = new ProducerRecord("bithon-trace", str2, sb.toString());
            producerRecord.headers().add("type", str.getBytes(StandardCharsets.UTF_8));
            this.producer.send(producerRecord);
        }
    }

    public void close() throws Exception {
        this.producer.destroy();
    }
}
