package org.frankframework.extensions.kafka;

import jakarta.annotation.Nonnull;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.frankframework.configuration.ConfigurationException;
import org.frankframework.configuration.ConfigurationWarning;
import org.frankframework.core.ISender;
import org.frankframework.core.PipeLineSession;
import org.frankframework.core.SenderException;
import org.frankframework.core.SenderResult;
import org.frankframework.stream.Message;

@ConfigurationWarning("Experimental and under development. Do not use unless you wish to participate in this development.")
@Deprecated(forRemoval = false)
/* loaded from: input_file:org/frankframework/extensions/kafka/KafkaSender.class */
public class KafkaSender extends KafkaFacade implements ISender {

    @Generated
    private static final Logger log = LogManager.getLogger(KafkaSender.class);
    private Producer<String, byte[]> producer;
    private String topic;

    @Override // org.frankframework.extensions.kafka.KafkaFacade
    public void configure() throws ConfigurationException {
        super.configure();
        if (StringUtils.isEmpty(this.topic)) {
            throw new ConfigurationException("topic must be specified");
        }
        if (this.topic.contains(",")) {
            throw new ConfigurationException("Only one topic is allowed to be used for sender.");
        }
        if (this.topic.contains("*")) {
            throw new ConfigurationException("Wildcards are not allowed to be used for sender.");
        }
    }

    public void open() throws SenderException {
        this.producer = new KafkaProducer(this.properties, new StringSerializer(), new ByteArraySerializer());
        try {
            Thread.sleep(100L);
            if (((Double) ((Metric) this.producer.metrics().values().stream().filter(metric -> {
                return "response-total".equals(metric.metricName().name());
            }).findFirst().orElseThrow(() -> {
                return new SenderException("Failed to get response-total metric.");
            })).metricValue()).intValue() == 0) {
                throw new SenderException("Didn't get a response from Kafka while connecting for Sending.");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SenderException(e);
        }
    }

    public void close() {
        this.producer.close();
    }

    @Nonnull
    public SenderResult sendMessage(@Nonnull Message message, @Nonnull PipeLineSession pipeLineSession) throws SenderException {
        try {
            message.preserve();
            try {
                RecordMetadata recordMetadata = (RecordMetadata) this.producer.send(new ProducerRecord(this.topic, message.asByteArray())).get();
                message.getContext().put("kafka.offset", Long.valueOf(recordMetadata.offset()));
                message.getContext().put("kafka.partition", Integer.valueOf(recordMetadata.partition()));
                return new SenderResult(message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SenderException(e);
            } catch (Exception e2) {
                throw new SenderException(e2);
            }
        } catch (Exception e3) {
            throw new SenderException("Failed to convert message to message type:", e3);
        }
    }

    public String getPhysicalDestinationName() {
        return "TOPIC(" + this.topic + ") on (" + getBootstrapServers() + ")";
    }

    @Generated
    void setProducer(Producer<String, byte[]> producer) {
        this.producer = producer;
    }

    @Generated
    public void setTopic(String str) {
        this.topic = str;
    }
}
