package org.axonframework.extensions.kafka.eventhandling.cloudevent;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter.class */
public class CloudEventKafkaMessageConverter implements KafkaMessageConverter<String, CloudEvent> {
    private static final Logger logger = LoggerFactory.getLogger(CloudEventKafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final EventUpcasterChain upcasterChain;
    private final Map<String, String> extensionNameResolver;
    private final Map<String, String> metadataNameResolver;
    private final Function<EventMessage<?>, URI> sourceSupplier;
    private final Function<EventMessage<?>, Optional<String>> subjectSupplier;
    private final Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier;
    private final Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier;
    private final boolean ignoreInvalidExtensionNames;

    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter$Builder.class */
    public static class Builder {
        private Serializer serializer;
        private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
        private EventUpcasterChain upcasterChain = new EventUpcasterChain(new EventUpcaster[0]);
        private final Map<String, String> metadataToExtensionMap = tracingMap();
        private Function<EventMessage<?>, URI> sourceSupplier = eventMessage -> {
            return URI.create("https://www.axoniq.io/");
        };
        private Function<EventMessage<?>, Optional<String>> subjectSupplier = MetadataUtils.defaultSubjectSupplier();
        private Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier = MetadataUtils.defaultDataContentTypeSupplier();
        private Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier = MetadataUtils.defaultDataSchemaSupplier();
        private boolean ignoreInvalidExtensionNames = false;

        private Map<String, String> tracingMap() {
            HashMap hashMap = new HashMap();
            hashMap.put("traceId", "traceid");
            hashMap.put("correlationId", "correlationid");
            return hashMap;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
            BuilderUtils.assertNonNull(sequencingPolicy, "SequencingPolicy may not be null");
            this.sequencingPolicy = sequencingPolicy;
            return this;
        }

        public Builder upcasterChain(EventUpcasterChain eventUpcasterChain) {
            BuilderUtils.assertNonNull(eventUpcasterChain, "UpcasterChain must not be null");
            this.upcasterChain = eventUpcasterChain;
            return this;
        }

        public Builder addMetadataMappers(Map<String, String> map) {
            BuilderUtils.assertThat(map, ExtensionUtils::isValidMetadataToExtensionMap, "The metadataMappers has invalid extension names");
            this.metadataToExtensionMap.putAll(map);
            return this;
        }

        public Builder addMetadataMapper(String str, String str2) {
            BuilderUtils.assertThat(str2, ExtensionUtils::isValidExtensionName, "The extension name is invalid");
            this.metadataToExtensionMap.put(str, str2);
            return this;
        }

        public Builder sourceSupplier(Function<EventMessage<?>, URI> function) {
            BuilderUtils.assertNonNull(function, "sourceSupplier must not be null");
            this.sourceSupplier = function;
            return this;
        }

        public Builder subjectSupplier(Function<EventMessage<?>, Optional<String>> function) {
            BuilderUtils.assertNonNull(function, "dataContentTypeSupplier must not be null");
            this.subjectSupplier = function;
            return this;
        }

        public Builder dataContentTypeSupplier(Function<EventMessage<?>, Optional<String>> function) {
            BuilderUtils.assertNonNull(function, "dataContentTypeSupplier must not be null");
            this.dataContentTypeSupplier = function;
            return this;
        }

        public Builder dataSchemaSupplier(Function<EventMessage<?>, Optional<URI>> function) {
            BuilderUtils.assertNonNull(function, "dataSchemaSupplier must not be null");
            this.dataSchemaSupplier = function;
            return this;
        }

        public Builder ignoreInvalidExtensionNames(boolean z) {
            this.ignoreInvalidExtensionNames = z;
            return this;
        }

        public CloudEventKafkaMessageConverter build() {
            return new CloudEventKafkaMessageConverter(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
        }
    }

    protected CloudEventKafkaMessageConverter(Builder builder) {
        builder.validate();
        this.serializer = builder.serializer;
        this.sequencingPolicy = builder.sequencingPolicy;
        this.upcasterChain = builder.upcasterChain;
        this.extensionNameResolver = builder.metadataToExtensionMap;
        this.metadataNameResolver = (Map) builder.metadataToExtensionMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getValue();
        }, (v0) -> {
            return v0.getKey();
        }));
        this.sourceSupplier = builder.sourceSupplier;
        this.subjectSupplier = builder.subjectSupplier;
        this.dataContentTypeSupplier = builder.dataContentTypeSupplier;
        this.dataSchemaSupplier = builder.dataSchemaSupplier;
        this.ignoreInvalidExtensionNames = builder.ignoreInvalidExtensionNames;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
    public ProducerRecord<String, CloudEvent> createKafkaMessage(EventMessage<?> eventMessage, String str) {
        return new ProducerRecord<>(str, (Integer) null, (Long) null, recordKey(eventMessage), toCloudEvent(eventMessage, eventMessage.serializePayload(this.serializer, byte[].class)), (Iterable) null);
    }

    private CloudEvent toCloudEvent(EventMessage<?> eventMessage, SerializedObject<byte[]> serializedObject) {
        CloudEventBuilder cloudEventBuilder = new CloudEventBuilder();
        cloudEventBuilder.withId(eventMessage.getIdentifier());
        cloudEventBuilder.withData((byte[]) serializedObject.getData());
        Optional<String> apply = this.subjectSupplier.apply(eventMessage);
        cloudEventBuilder.getClass();
        apply.ifPresent(cloudEventBuilder::withSubject);
        Optional<String> apply2 = this.dataContentTypeSupplier.apply(eventMessage);
        cloudEventBuilder.getClass();
        apply2.ifPresent(cloudEventBuilder::withDataContentType);
        Optional<URI> apply3 = this.dataSchemaSupplier.apply(eventMessage);
        cloudEventBuilder.getClass();
        apply3.ifPresent(cloudEventBuilder::withDataSchema);
        cloudEventBuilder.withSource(this.sourceSupplier.apply(eventMessage));
        cloudEventBuilder.withType(serializedObject.getType().getName());
        cloudEventBuilder.withTime(eventMessage.getTimestamp().atOffset(ZoneOffset.UTC));
        ExtensionUtils.setExtensions(cloudEventBuilder, eventMessage, serializedObject, this.extensionNameResolver, this.ignoreInvalidExtensionNames);
        return cloudEventBuilder.build();
    }

    private String recordKey(EventMessage<?> eventMessage) {
        Object sequenceIdentifierFor = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        if (sequenceIdentifierFor != null) {
            return sequenceIdentifierFor.toString();
        }
        return null;
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, CloudEvent> consumerRecord) {
        try {
            CloudEvent cloudEvent = (CloudEvent) consumerRecord.value();
            return this.upcasterChain.upcast(Stream.of(new InitialEventRepresentation(createEventData(cloudEvent, consumerRecord.timestamp()), this.serializer))).findFirst().map(intermediateEventRepresentation -> {
                return new SerializedMessage(intermediateEventRepresentation.getMessageIdentifier(), new LazyDeserializingObject(intermediateEventRepresentation.getData(), this.serializer), intermediateEventRepresentation.getMetaData());
            }).flatMap(serializedMessage -> {
                return buildMessage(cloudEvent, serializedMessage, consumerRecord.timestamp());
            });
        } catch (Exception e) {
            logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
            return Optional.empty();
        }
    }

    private EventData<?> createEventData(CloudEvent cloudEvent, long j) {
        return new GenericDomainEventEntry(ExtensionUtils.asNullableString(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_TYPE)), ExtensionUtils.asNullableString(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_ID)), ExtensionUtils.asLong(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_SEQ)).longValue(), cloudEvent.getId(), ExtensionUtils.asOffsetDateTime(cloudEvent.getTime(), j), cloudEvent.getType(), ExtensionUtils.asNullableString(cloudEvent.getExtension(ExtensionUtils.MESSAGE_REVISION)), ExtensionUtils.asBytes(cloudEvent.getData()), extractMetadataAsBytes(cloudEvent));
    }

    private byte[] extractMetadataAsBytes(CloudEvent cloudEvent) {
        return (byte[]) this.serializer.serialize(ExtensionUtils.getExtensionsAsMetadata(cloudEvent, this.metadataNameResolver).mergedWith(MetadataUtils.getAdditionalEntries(cloudEvent)), byte[].class).getData();
    }

    private static boolean isDomainEvent(CloudEvent cloudEvent) {
        return (cloudEvent.getExtension(ExtensionUtils.AGGREGATE_TYPE) == null || cloudEvent.getExtension(ExtensionUtils.AGGREGATE_ID) == null || cloudEvent.getExtension(ExtensionUtils.AGGREGATE_SEQ) == null) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Optional<EventMessage<?>> buildMessage(CloudEvent cloudEvent, SerializedMessage<?> serializedMessage, long j) {
        Instant from = Instant.from(ExtensionUtils.asOffsetDateTime(cloudEvent.getTime(), j));
        return isDomainEvent(cloudEvent) ? buildDomainEventMessage(cloudEvent, serializedMessage, from) : buildEventMessage(serializedMessage, from);
    }

    private static Optional<EventMessage<?>> buildDomainEventMessage(CloudEvent cloudEvent, SerializedMessage<?> serializedMessage, Instant instant) {
        return Optional.of(new GenericDomainEventMessage(ExtensionUtils.asNullableString(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_TYPE)), ExtensionUtils.asNullableString(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_ID)), ExtensionUtils.asLong(cloudEvent.getExtension(ExtensionUtils.AGGREGATE_SEQ)).longValue(), serializedMessage, () -> {
            return instant;
        }));
    }

    private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?> serializedMessage, Instant instant) {
        return Optional.of(new GenericEventMessage(serializedMessage, () -> {
            return instant;
        }));
    }
}
