package org.birchframework.framework.kafka;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.lang.String;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.birchframework.configuration.BirchProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

@EnableConfigurationProperties({BirchProperties.class})
@ConditionalOnClass({KafkaTemplate.class})
@Component
/* loaded from: input_file:org/birchframework/framework/kafka/KafkaSender.class */
public class KafkaSender<K extends String, V extends Serializable> {
    private final KafkaTemplate<K, V> kafkaTemplate;
    private final long waitTimeMillis;

    public KafkaSender(KafkaTemplate<K, V> kafkaTemplate, BirchProperties birchProperties) {
        this.kafkaTemplate = kafkaTemplate;
        BirchProperties.Kafka.Sender sender = birchProperties.getKafka().getSender();
        this.waitTimeMillis = sender.getWaitTime().toMillis();
        this.kafkaTemplate.setAllowNonTransactional(sender.isAllowNonTransactional());
    }

    public Optional<KafkaSendResult<K, V>> send(String str, K k, V v) throws InterruptedException {
        return send(str, null, k, v);
    }

    public Optional<KafkaSendResult<K, V>> send(String str, V v) throws InterruptedException {
        return send(str, null, null, v);
    }

    @SuppressFBWarnings({"NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"})
    public Optional<KafkaSendResult<K, V>> send(String str, Integer num, K k, V v) throws InterruptedException {
        KafkaSendResult kafkaSendResult = new KafkaSendResult();
        try {
            kafkaSendResult.result = (SendResult) this.kafkaTemplate.send(str, num, k, v).get(this.waitTimeMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            kafkaSendResult.hasError = true;
            kafkaSendResult.exception = e;
            throw e;
        } catch (ExecutionException | TimeoutException e2) {
            kafkaSendResult.hasError = true;
            kafkaSendResult.exception = e2;
        }
        return Optional.ofNullable(kafkaSendResult);
    }

    public Optional<KafkaSendResult<K, V>> sendTransactional(String str, Integer num, K k, V v) {
        KafkaSendResult kafkaSendResult = new KafkaSendResult();
        kafkaSendResult.result = (SendResult) this.kafkaTemplate.executeInTransaction(kafkaOperations -> {
            SendResult sendResult;
            try {
                sendResult = (SendResult) kafkaOperations.send(str, num, k, v).get(this.waitTimeMillis, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                kafkaSendResult.hasError = true;
                kafkaSendResult.exception = e;
                sendResult = new SendResult((ProducerRecord) null, (RecordMetadata) null);
            }
            return sendResult;
        });
        return Optional.of(kafkaSendResult);
    }

    public void sendAsync(String str, V v) {
        sendAsync(str, null, null, v, sendResult -> {
        }, th -> {
        });
    }

    public void sendAsync(String str, V v, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        sendAsync(str, null, null, v, successCallback, failureCallback);
    }

    public void sendAsync(String str, K k, V v) {
        sendAsync(str, k, v, sendResult -> {
        }, th -> {
        });
    }

    public void sendAsync(String str, K k, V v, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        sendAsync(str, null, k, v, successCallback, failureCallback);
    }

    @SuppressFBWarnings({"NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"})
    public void sendAsync(String str, Integer num, K k, V v, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        this.kafkaTemplate.send(str, num, k, v).addCallback(successCallback, failureCallback);
    }

    public void sendAsyncTransactional(String str, Integer num, K k, V v, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        ListenableFuture listenableFuture = (ListenableFuture) this.kafkaTemplate.executeInTransaction(kafkaOperations -> {
            return kafkaOperations.send(str, num, k, v);
        });
        Assert.notNull(listenableFuture, "Future returned by Kafka Template is null");
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}
