package org.axonframework.extensions.kafka.eventhandling;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.class */
public class DefaultKafkaMessageConverter implements KafkaMessageConverter<String, byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultKafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final BiFunction<String, Object, RecordHeader> headerValueMapper;
    private final EventUpcasterChain upcasterChain;

    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter$Builder.class */
    public static class Builder {
        private Serializer serializer;
        private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
        private BiFunction<String, Object, RecordHeader> headerValueMapper = HeaderUtils.byteMapper();
        private EventUpcasterChain upcasterChain = new EventUpcasterChain(new EventUpcaster[0]);

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
            BuilderUtils.assertNonNull(sequencingPolicy, "SequencingPolicy may not be null");
            this.sequencingPolicy = sequencingPolicy;
            return this;
        }

        public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> biFunction) {
            BuilderUtils.assertNonNull(biFunction, "{} may not be null");
            this.headerValueMapper = biFunction;
            return this;
        }

        public Builder upcasterChain(EventUpcasterChain eventUpcasterChain) {
            BuilderUtils.assertNonNull(eventUpcasterChain, "UpcasterChain must not be null");
            this.upcasterChain = eventUpcasterChain;
            return this;
        }

        public DefaultKafkaMessageConverter build() {
            return new DefaultKafkaMessageConverter(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
        }
    }

    protected DefaultKafkaMessageConverter(Builder builder) {
        builder.validate();
        this.serializer = builder.serializer;
        this.sequencingPolicy = builder.sequencingPolicy;
        this.headerValueMapper = builder.headerValueMapper;
        this.upcasterChain = builder.upcasterChain;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
    public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String str) {
        SerializedObject serializePayload = eventMessage.serializePayload(this.serializer, byte[].class);
        return new ProducerRecord<>(str, (Integer) null, (Long) null, recordKey(eventMessage), serializePayload.getData(), HeaderUtils.toHeaders(eventMessage, serializePayload, this.headerValueMapper));
    }

    private String recordKey(EventMessage<?> eventMessage) {
        Object sequenceIdentifierFor = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        if (sequenceIdentifierFor != null) {
            return sequenceIdentifierFor.toString();
        }
        return null;
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            Headers headers = consumerRecord.headers();
            if (isAxonMessage(headers)) {
                return this.upcasterChain.upcast(Stream.of(new InitialEventRepresentation(createEventData(headers, (byte[]) consumerRecord.value()), this.serializer))).findFirst().map(intermediateEventRepresentation -> {
                    return new SerializedMessage(intermediateEventRepresentation.getMessageIdentifier(), new LazyDeserializingObject(intermediateEventRepresentation.getData(), this.serializer), intermediateEventRepresentation.getMetaData());
                }).flatMap(serializedMessage -> {
                    return buildMessage(headers, serializedMessage);
                });
            }
        } catch (Exception e) {
            logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
        }
        return Optional.empty();
    }

    private EventData<?> createEventData(Headers headers, byte[] bArr) {
        return new GenericDomainEventEntry(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq", 0L).longValue(), HeaderUtils.valueAsString(headers, "axon-message-id"), HeaderUtils.valueAsLong(headers, "axon-message-timestamp"), HeaderUtils.valueAsString(headers, "axon-message-type"), HeaderUtils.valueAsString(headers, "axon-message-revision", null), bArr, extractMetadataAsBytes(headers));
    }

    private byte[] extractMetadataAsBytes(Headers headers) {
        return (byte[]) this.serializer.serialize(MetaData.from(HeaderUtils.extractAxonMetadata(headers)), byte[].class).getData();
    }

    private static boolean isAxonMessage(Headers headers) {
        return HeaderUtils.keys(headers).containsAll(Arrays.asList("axon-message-id", "axon-message-type"));
    }

    private static boolean isDomainEvent(Headers headers) {
        return (headers.lastHeader("axon-message-aggregate-type") == null || headers.lastHeader("axon-message-aggregate-id") == null || headers.lastHeader("axon-message-aggregate-seq") == null) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> serializedMessage) {
        long longValue = HeaderUtils.valueAsLong(headers, "axon-message-timestamp").longValue();
        return isDomainEvent(headers) ? buildDomainEventMessage(headers, serializedMessage, longValue) : buildEventMessage(serializedMessage, longValue);
    }

    private static Optional<EventMessage<?>> buildDomainEventMessage(Headers headers, SerializedMessage<?> serializedMessage, long j) {
        return Optional.of(new GenericDomainEventMessage(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq").longValue(), serializedMessage, () -> {
            return Instant.ofEpochMilli(j);
        }));
    }

    private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?> serializedMessage, long j) {
        return Optional.of(new GenericEventMessage(serializedMessage, () -> {
            return Instant.ofEpochMilli(j);
        }));
    }
}
