package org.axonframework.kafka.eventhandling.producer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/kafka/eventhandling/producer/KafkaPublisher.class */
public class KafkaPublisher<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
    private final SubscribableMessageSource<EventMessage<?>> messageSource;
    private final ProducerFactory<K, V> producerFactory;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final String topic;
    private final long publisherAckTimeout;
    private Registration eventBusRegistration;

    /* loaded from: input_file:org/axonframework/kafka/eventhandling/producer/KafkaPublisher$Builder.class */
    public static class Builder<K, V> {
        private SubscribableMessageSource<EventMessage<?>> messageSource;
        private ProducerFactory<K, V> producerFactory;
        private KafkaMessageConverter<K, V> messageConverter = DefaultKafkaMessageConverter.builder().serializer(XStreamSerializer.builder().build()).build();
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.instance();
        private String topic = "Axon.Events";
        private long publisherAckTimeout = 1000;

        public Builder<K, V> messageSource(SubscribableMessageSource<EventMessage<?>> subscribableMessageSource) {
            BuilderUtils.assertNonNull(subscribableMessageSource, "SubscribableMessageSource may not be null");
            this.messageSource = subscribableMessageSource;
            return this;
        }

        public Builder<K, V> producerFactory(ProducerFactory<K, V> producerFactory) {
            BuilderUtils.assertNonNull(producerFactory, "ProducerFactory may not be null");
            this.producerFactory = producerFactory;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            BuilderUtils.assertNonNull(kafkaMessageConverter, "MessageConverter may not be null");
            this.messageConverter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder<K, V> topic(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The topic may not be null or empty");
            this.topic = str;
            return this;
        }

        public Builder<K, V> publisherAckTimeout(long j) {
            BuilderUtils.assertThat(Long.valueOf(j), l -> {
                return l.longValue() >= 0;
            }, "The publisherAckTimeout should be a positive number or zero");
            this.publisherAckTimeout = j;
            return this;
        }

        public KafkaPublisher<K, V> build() {
            return new KafkaPublisher<>(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.messageSource, "The SubscribableMessageSource is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.producerFactory, "The ProducerFactory is a hard requirement and should be provided");
        }
    }

    protected KafkaPublisher(Builder<K, V> builder) {
        builder.validate();
        this.messageSource = ((Builder) builder).messageSource;
        this.producerFactory = ((Builder) builder).producerFactory;
        this.messageConverter = ((Builder) builder).messageConverter;
        this.messageMonitor = ((Builder) builder).messageMonitor;
        this.topic = ((Builder) builder).topic;
        this.publisherAckTimeout = ((Builder) builder).publisherAckTimeout;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    public void start() {
        this.eventBusRegistration = this.messageSource.subscribe(this::send);
    }

    public void shutDown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
            this.eventBusRegistration = null;
        }
        this.producerFactory.shutDown();
    }

    protected void send(List<? extends EventMessage<?>> list) {
        Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> onMessagesIngested = this.messageMonitor.onMessagesIngested(list);
        Producer<?, ?> createProducer = this.producerFactory.createProducer();
        ConfirmationMode confirmationMode = this.producerFactory.confirmationMode();
        try {
            if (confirmationMode.isTransactional()) {
                tryBeginTxn(createProducer);
            }
            Map<Future<RecordMetadata>, ? super EventMessage<?>> publishToKafka = publishToKafka(list, createProducer);
            if (CurrentUnitOfWork.isStarted()) {
                handleActiveUnitOfWork(createProducer, publishToKafka, onMessagesIngested, confirmationMode);
            } else if (confirmationMode.isTransactional()) {
                tryCommit(createProducer, onMessagesIngested);
            } else if (confirmationMode.isWaitForAck()) {
                waitForPublishAck(publishToKafka, onMessagesIngested);
            }
            if (CurrentUnitOfWork.isStarted()) {
                return;
            }
            tryClose(createProducer);
        } catch (Throwable th) {
            if (!CurrentUnitOfWork.isStarted()) {
                tryClose(createProducer);
            }
            throw th;
        }
    }

    private Map<Future<RecordMetadata>, ? super EventMessage<?>> publishToKafka(List<? extends EventMessage<?>> list, Producer<K, V> producer) {
        HashMap hashMap = new HashMap();
        list.forEach(eventMessage -> {
            hashMap.put(producer.send(this.messageConverter.createKafkaMessage(eventMessage, this.topic)), eventMessage);
        });
        return hashMap;
    }

    private void handleActiveUnitOfWork(Producer<K, V> producer, Map<Future<RecordMetadata>, ? super EventMessage<?>> map, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> map2, ConfirmationMode confirmationMode) {
        UnitOfWork unitOfWork = CurrentUnitOfWork.get();
        unitOfWork.afterCommit(unitOfWork2 -> {
            completeKafkaWork(map2, producer, confirmationMode, map);
        });
        unitOfWork.onRollback(unitOfWork3 -> {
            rollbackKafkaWork(producer, confirmationMode);
        });
    }

    private void completeKafkaWork(Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> map, Producer<K, V> producer, ConfirmationMode confirmationMode, Map<Future<RecordMetadata>, ? super EventMessage<?>> map2) {
        if (confirmationMode.isTransactional()) {
            tryCommit(producer, map);
        } else if (confirmationMode.isWaitForAck()) {
            waitForPublishAck(map2, map);
        }
        tryClose(producer);
    }

    private void rollbackKafkaWork(Producer<K, V> producer, ConfirmationMode confirmationMode) {
        if (confirmationMode.isTransactional()) {
            tryRollback(producer);
        }
        tryClose(producer);
    }

    private void waitForPublishAck(Map<Future<RecordMetadata>, ? super EventMessage<?>> map, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> map2) {
        long currentTimeMillis = System.currentTimeMillis() + this.publisherAckTimeout;
        map.forEach((future, obj) -> {
            try {
                future.get(Math.max(0L, currentTimeMillis - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
                if (map2.containsKey(obj)) {
                    ((MessageMonitor.MonitorCallback) map2.get(obj)).reportSuccess();
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                ((MessageMonitor.MonitorCallback) map2.get(obj)).reportFailure(e);
                logger.warn("Encountered error while waiting for event publication", e);
            }
        });
    }

    private void tryBeginTxn(Producer<?, ?> producer) {
        try {
            producer.beginTransaction();
        } catch (ProducerFencedException e) {
            logger.warn("Unable to begin transaction", e);
            throw new EventPublicationFailedException("Event publication failed: Exception occurred while starting kafka transaction", e);
        }
    }

    private void tryCommit(Producer<?, ?> producer, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> map) {
        try {
            producer.commitTransaction();
            map.forEach((obj, monitorCallback) -> {
                monitorCallback.reportSuccess();
            });
        } catch (ProducerFencedException e) {
            logger.warn("Unable to commit transaction", e);
            map.forEach((obj2, monitorCallback2) -> {
                monitorCallback2.reportFailure(e);
            });
            throw new EventPublicationFailedException("Event publication failed: Exception occurred while committing kafka transaction", e);
        }
    }

    private void tryClose(Producer<?, ?> producer) {
        try {
            producer.close();
        } catch (Exception e) {
            logger.debug("Unable to close producer.", e);
        }
    }

    private void tryRollback(Producer<?, ?> producer) {
        try {
            producer.abortTransaction();
        } catch (Exception e) {
            logger.warn("Unable to abort transaction", e);
        }
    }
}
