package io.debezium.server.kafka;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("kafka")
@Dependent
/* loaded from: input_file:io/debezium/server/kafka/KafkaChangeConsumer.class */
public class KafkaChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.kafka.";
    private static final String PROP_PREFIX_PRODUCER = "debezium.sink.kafka.producer.";

    @ConfigProperty(name = "debezium.sink.kafka.wait.message.delivery.timeout.ms", defaultValue = "30000")
    Integer waitMessageDeliveryTimeout;
    private KafkaProducer<Object, Object> producer;

    @Inject
    @CustomConsumerBuilder
    Instance<KafkaProducer<Object, Object>> customKafkaProducer;

    @PostConstruct
    void start() {
        if (this.customKafkaProducer.isResolvable()) {
            this.producer = (KafkaProducer) this.customKafkaProducer.get();
            LOGGER.info("Obtained custom configured KafkaProducer '{}'", this.producer);
        } else {
            this.producer = new KafkaProducer<>(getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX_PRODUCER));
            LOGGER.info("consumer started...");
        }
    }

    @PreDestroy
    void stop() {
        LOGGER.info("consumer destroyed...");
        if (this.producer != null) {
            try {
                this.producer.close(Duration.ofSeconds(5L));
            } catch (Throwable th) {
                LOGGER.warn("Could not close producer", th);
            }
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            try {
                LOGGER.trace("Received event '{}'", changeEvent);
                Headers convertKafkaHeaders = convertKafkaHeaders(changeEvent);
                String map = this.streamNameMapper.map(changeEvent.destination());
                Future send = this.producer.send(new ProducerRecord(map, (Integer) null, (Long) null, changeEvent.key(), changeEvent.value(), convertKafkaHeaders), (recordMetadata, exc) -> {
                    if (exc != null) {
                        LOGGER.error("Failed to send record to {}:", map, exc);
                        throw new DebeziumException(exc);
                    }
                    LOGGER.trace("Sent message with offset: {}", Long.valueOf(recordMetadata.offset()));
                });
                if (this.waitMessageDeliveryTimeout.intValue() == 0) {
                    send.get();
                } else {
                    send.get(this.waitMessageDeliveryTimeout.intValue(), TimeUnit.MILLISECONDS);
                }
                recordCommitter.markProcessed(changeEvent);
            } catch (Exception e) {
                throw new DebeziumException(e);
            }
        }
        recordCommitter.markBatchFinished();
    }

    private Headers convertKafkaHeaders(ChangeEvent<Object, Object> changeEvent) {
        List<Header> headers = changeEvent.headers();
        RecordHeaders recordHeaders = new RecordHeaders();
        for (Header header : headers) {
            recordHeaders.add(header.getKey(), getBytes(header.getValue()));
        }
        return recordHeaders;
    }
}
