package io.apicurio.registry.storage.impl.kafkasql.sql;

import io.apicurio.registry.logging.Logged;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlCoordinator;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlMessage;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlMessageKey;
import io.apicurio.registry.storage.impl.sql.SqlRegistryStorage;
import io.apicurio.registry.types.RegistryException;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;

@ApplicationScoped
@Logged
/* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.class */
public class KafkaSqlSink {

    @Inject
    Logger log;

    @Inject
    KafkaSqlCoordinator coordinator;

    @Inject
    SqlRegistryStorage sqlStore;

    @ActivateRequestContext
    public void processMessage(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> consumerRecord) {
        UUID extractUuid = extractUuid(consumerRecord);
        this.log.debug("Processing Kafka message with UUID: {}", extractUuid);
        try {
            Object doProcessMessage = doProcessMessage(consumerRecord);
            Logger logger = this.log;
            Object[] objArr = new Object[3];
            objArr[0] = ((KafkaSqlMessageKey) consumerRecord.key()).getMessageType();
            objArr[1] = consumerRecord.value() != null ? ((KafkaSqlMessage) consumerRecord.value()).toString() : "";
            objArr[2] = doProcessMessage != null ? doProcessMessage.toString() : "";
            logger.trace("Processed message key: {} value: {} result: {}", objArr);
            this.log.debug("Kafka message successfully processed. Notifying listeners of response.");
            this.coordinator.notifyResponse(extractUuid, doProcessMessage);
        } catch (RegistryException e) {
            this.log.debug("Registry exception detected: {}", e.getMessage());
            this.coordinator.notifyResponse(extractUuid, e);
        } catch (Throwable th) {
            this.log.debug("Unexpected exception detected: {}", th.getMessage());
            this.coordinator.notifyResponse(extractUuid, new RegistryException(th));
        }
    }

    private UUID extractUuid(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> consumerRecord) {
        return (UUID) Optional.ofNullable(consumerRecord.headers().headers("req")).map((v0) -> {
            return v0.iterator();
        }).map(it -> {
            if (it.hasNext()) {
                return (Header) it.next();
            }
            return null;
        }).map((v0) -> {
            return v0.value();
        }).map(String::new).map(UUID::fromString).orElse(null);
    }

    private Object doProcessMessage(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> consumerRecord) {
        return ((KafkaSqlMessage) consumerRecord.value()).dispatchTo(this.sqlStore);
    }
}
