package io.streamthoughts.jikkou.kafka.internals.producer;

import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/producer/KafkaRecordSender.class */
public final class KafkaRecordSender<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaRecordSender.class);
    private final Producer<K, V> producer;

    public KafkaRecordSender(@NotNull Producer<K, V> producer) {
        this.producer = (Producer) Objects.requireNonNull(producer, "producer must not be null");
    }

    public List<CompletableFuture<ProducerRequestResult<K, V>>> send(@NotNull Collection<KafkaRecord<K, V>> collection) {
        return doSend(this.producer, collection);
    }

    public CompletableFuture<ProducerRequestResult<K, V>> send(@NotNull KafkaRecord<K, V> kafkaRecord) {
        return doSend(this.producer, kafkaRecord);
    }

    private static <K, V> List<CompletableFuture<ProducerRequestResult<K, V>>> doSend(Producer<K, V> producer, Collection<KafkaRecord<K, V>> collection) {
        return collection.stream().map(kafkaRecord -> {
            return doSend(producer, kafkaRecord);
        }).toList();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> CompletableFuture<ProducerRequestResult<K, V>> doSend(Producer<K, V> producer, KafkaRecord<K, V> kafkaRecord) {
        LOG.debug("Sending record to topic {}", kafkaRecord.topic());
        try {
            CompletableFuture<ProducerRequestResult<K, V>> completableFuture = new CompletableFuture<>();
            producer.send(kafkaRecord.toProducerRecord(), buildCallback(completableFuture, kafkaRecord));
            return completableFuture;
        } catch (KafkaException e) {
            return CompletableFuture.completedFuture(new ProducerRequestResult(kafkaRecord, e));
        }
    }

    private static <K, V> Callback buildCallback(final CompletableFuture<ProducerRequestResult<K, V>> completableFuture, final KafkaRecord<K, V> kafkaRecord) {
        return new Callback() { // from class: io.streamthoughts.jikkou.kafka.internals.producer.KafkaRecordSender.1
            @Override // org.apache.kafka.clients.producer.Callback
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                ProducerRequestResult producerRequestResult;
                if (exc == null) {
                    KafkaRecordSender.LOG.debug("Record was successfully sent to kafka topic {}-{} ", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()));
                    producerRequestResult = new ProducerRequestResult(KafkaRecord.this, Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), Long.valueOf(recordMetadata.timestamp()));
                } else {
                    KafkaRecordSender.LOG.warn("Failed to send record into kafka topic {}", recordMetadata.topic(), exc);
                    producerRequestResult = new ProducerRequestResult(KafkaRecord.this, exc);
                }
                completableFuture.complete(producerRequestResult);
            }
        };
    }
}
