package io.apicurio.registry.storage.impl.kafkasql;

import io.apicurio.common.apps.config.Info;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeyDeserializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeySerializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlPartitioner;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueDeserializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueSerializer;
import io.apicurio.registry.utils.RegistryProperties;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ProducerActions;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.class */
public class KafkaSqlFactory {

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.bootstrap.servers")
    @Info(category = "storage", description = "Kafka sql storage bootstrap servers")
    String bootstrapServers;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.topic", defaultValue = "kafkasql-journal")
    @Info(category = "storage", description = "Kafka sql storage topic name")
    String topic;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.snapshots.topic", defaultValue = "kafkasql-snapshots")
    @Info(category = "storage", description = "Kafka sql storage topic name", registryAvailableSince = "3.0.0")
    String snapshotsTopic;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.snapshot.every.seconds", defaultValue = "86400s")
    @Info(category = "storage", description = "Kafka sql journal topic snapshot every", registryAvailableSince = "3.0.0")
    String snapshotEvery;

    @Inject
    @ConfigProperty(name = "apicurio.storage.snapshot.location", defaultValue = "./")
    @Info(category = "storage", description = "Kafka sql snapshots store location", registryAvailableSince = "3.0.0")
    String snapshotStoreLocation;

    @Inject
    @ConfigProperty(name = "apicurio.events.kafka.topic", defaultValue = "registry-events")
    @Info(category = "storage", description = "Kafka sql storage event topic", registryAvailableSince = "3.0.1")
    String eventsTopic;

    @Inject
    @RegistryProperties({"apicurio.kafkasql.topic"})
    @Info(category = "storage", description = "Kafka sql storage topic properties")
    Properties topicProperties;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.topic.auto-create", defaultValue = "true")
    @Info(category = "storage", description = "Kafka sql storage topic auto create")
    Boolean topicAutoCreate;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.consumer.poll.timeout", defaultValue = "5000")
    @Info(category = "storage", description = "Kafka sql storage consumer poll timeout")
    Integer pollTimeout;

    @Inject
    @ConfigProperty(name = "apicurio.kafkasql.coordinator.response-timeout", defaultValue = "30000")
    @Info(category = "storage", description = "Kafka sql storage coordinator response timeout")
    Integer responseTimeout;

    @Inject
    @RegistryProperties(value = {"apicurio.kafka.common", "apicurio.kafkasql.producer"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties producerProperties;

    @Inject
    @RegistryProperties(value = {"apicurio.kafka.common", "apicurio.kafkasql.consumer"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties consumerProperties;

    @ConfigProperty(name = "apicurio.kafkasql.consumer.group-prefix", defaultValue = "apicurio-")
    @Info(category = "storage", description = "Kafka sql storage prefix for consumer group name")
    String groupPrefix;

    @Inject
    @RegistryProperties(value = {"apicurio.kafka.common", "apicurio.kafkasql.admin"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties adminProperties;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.enabled", defaultValue = "false")
    @Info(category = "storage", description = "Kafka sql storage sasl enabled")
    boolean saslEnabled;

    @ConfigProperty(name = "apicurio.kafkasql.security.protocol", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage security protocol")
    Optional<String> protocol;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.mechanism", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage sasl mechanism")
    String saslMechanism;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.client-id", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage sasl client identifier")
    String clientId;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.client-secret", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage sasl client secret")
    String clientSecret;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.token.endpoint", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage sasl token endpoint")
    String tokenEndpoint;

    @ConfigProperty(name = "apicurio.kafkasql.security.sasl.login.callback.handler.class", defaultValue = "")
    @Info(category = "storage", description = "Kafka sql storage sasl login callback handler")
    String loginCallbackHandler;

    @ConfigProperty(name = "apicurio.kafkasql.security.ssl.truststore.location")
    @Info(category = "storage", description = "Kafka sql storage ssl truststore location")
    Optional<String> trustStoreLocation;

    @ConfigProperty(name = "apicurio.kafkasql.security.ssl.truststore.type")
    @Info(category = "storage", description = "Kafka sql storage ssl truststore type")
    Optional<String> trustStoreType;

    @ConfigProperty(name = "apicurio.kafkasql.ssl.truststore.password")
    @Info(category = "storage", description = "Kafka sql storage ssl truststore password")
    Optional<String> trustStorePassword;

    @ConfigProperty(name = "apicurio.kafkasql.ssl.keystore.location")
    @Info(category = "storage", description = "Kafka sql storage ssl keystore location")
    Optional<String> keyStoreLocation;

    @ConfigProperty(name = "apicurio.kafkasql.ssl.keystore.type")
    @Info(category = "storage", description = "Kafka sql storage ssl keystore type")
    Optional<String> keyStoreType;

    @ConfigProperty(name = "apicurio.kafkasql.ssl.keystore.password")
    @Info(category = "storage", description = "Kafka sql storage ssl keystore password")
    Optional<String> keyStorePassword;

    @ConfigProperty(name = "apicurio.kafkasql.ssl.key.password")
    @Info(category = "storage", description = "Kafka sql storage ssl key password")
    Optional<String> keyPassword;

    @ApplicationScoped
    @Produces
    public KafkaSqlConfiguration createConfiguration() {
        return new KafkaSqlConfiguration() { // from class: io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory.1
            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String bootstrapServers() {
                return KafkaSqlFactory.this.bootstrapServers;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String topic() {
                return KafkaSqlFactory.this.topic;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String snapshotsTopic() {
                return KafkaSqlFactory.this.snapshotsTopic;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String eventsTopic() {
                return KafkaSqlFactory.this.eventsTopic;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String snapshotEvery() {
                return KafkaSqlFactory.this.snapshotEvery;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String snapshotLocation() {
                return KafkaSqlFactory.this.snapshotStoreLocation;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties topicProperties() {
                return KafkaSqlFactory.this.topicProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public boolean isTopicAutoCreate() {
                return KafkaSqlFactory.this.topicAutoCreate.booleanValue();
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Integer pollTimeout() {
                return KafkaSqlFactory.this.pollTimeout;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Integer responseTimeout() {
                return KafkaSqlFactory.this.responseTimeout;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties producerProperties() {
                return KafkaSqlFactory.this.producerProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties consumerProperties() {
                return KafkaSqlFactory.this.consumerProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties adminProperties() {
                KafkaSqlFactory.this.tryToConfigureSecurity(KafkaSqlFactory.this.adminProperties);
                return KafkaSqlFactory.this.adminProperties;
            }
        };
    }

    @ApplicationScoped
    @Named("KafkaSqlJournalProducer")
    @Produces
    public ProducerActions<KafkaSqlMessageKey, KafkaSqlMessage> createKafkaJournalProducer() {
        Properties properties = (Properties) this.producerProperties.clone();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("client.id", "Producer-" + UUID.randomUUID().toString());
        properties.putIfAbsent("acks", "all");
        properties.putIfAbsent("linger.ms", 10);
        properties.putIfAbsent("partitioner.class", KafkaSqlPartitioner.class);
        tryToConfigureSecurity(properties);
        return new AsyncProducer(properties, new KafkaSqlKeySerializer(), new KafkaSqlValueSerializer());
    }

    @ApplicationScoped
    @Named("KafkaSqlJournalConsumer")
    @Produces
    public KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> createKafkaJournalConsumer() {
        Properties properties = (Properties) this.consumerProperties.clone();
        String str = this.groupPrefix + UUID.randomUUID().toString();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("group.id", str);
        properties.putIfAbsent("enable.auto.commit", "true");
        properties.putIfAbsent("auto.commit.interval.ms", "1000");
        properties.putIfAbsent("auto.offset.reset", "earliest");
        tryToConfigureSecurity(properties);
        return new KafkaConsumer<>(properties, new KafkaSqlKeyDeserializer(), new KafkaSqlValueDeserializer());
    }

    @ApplicationScoped
    @Named("KafkaSqlSnapshotsProducer")
    @Produces
    public ProducerActions<String, String> createKafkaSnapshotsProducer() {
        Properties properties = (Properties) this.producerProperties.clone();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("client.id", "Producer-" + UUID.randomUUID().toString());
        properties.putIfAbsent("acks", "all");
        properties.putIfAbsent("linger.ms", 10);
        properties.putIfAbsent("partitioner.class", KafkaSqlPartitioner.class);
        tryToConfigureSecurity(properties);
        return new AsyncProducer(properties, new StringSerializer(), new StringSerializer());
    }

    @ApplicationScoped
    @Named("KafkaSqlSnapshotsConsumer")
    @Produces
    public KafkaConsumer<String, String> createKafkaSnapshotsConsumer() {
        Properties properties = (Properties) this.consumerProperties.clone();
        String str = this.groupPrefix + UUID.randomUUID().toString();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("group.id", str);
        properties.putIfAbsent("enable.auto.commit", "true");
        properties.putIfAbsent("auto.commit.interval.ms", "1000");
        properties.putIfAbsent("auto.offset.reset", "earliest");
        tryToConfigureSecurity(properties);
        return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
    }

    @ApplicationScoped
    @Named("KafkaSqlEventsProducer")
    @Produces
    public ProducerActions<String, String> createKafkaSqlEventsProducer() {
        Properties properties = (Properties) this.producerProperties.clone();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("client.id", "Producer-" + UUID.randomUUID().toString());
        properties.putIfAbsent("acks", "all");
        properties.putIfAbsent("linger.ms", 10);
        properties.putIfAbsent("partitioner.class", KafkaSqlPartitioner.class);
        tryToConfigureSecurity(properties);
        return new AsyncProducer(properties, new StringSerializer(), new StringSerializer());
    }

    private void tryToConfigureSecurity(Properties properties) {
        this.protocol.ifPresent(str -> {
            properties.putIfAbsent("security.protocol", str);
        });
        if (this.saslEnabled) {
            properties.putIfAbsent("sasl.jaas.config", String.format("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required   oauth.client.id=\"%s\"   oauth.client.secret=\"%s\"   oauth.token.endpoint.uri=\"%s\" ;", this.clientId, this.clientSecret, this.tokenEndpoint));
            properties.putIfAbsent("sasl.mechanism", this.saslMechanism);
            properties.putIfAbsent("sasl.login.callback.handler.class", this.loginCallbackHandler);
        }
        if (this.trustStoreLocation.isPresent() && this.trustStorePassword.isPresent() && this.trustStoreType.isPresent()) {
            properties.putIfAbsent("ssl.truststore.type", this.trustStoreType.get());
            properties.putIfAbsent("ssl.truststore.location", this.trustStoreLocation.get());
            properties.putIfAbsent("ssl.truststore.password", this.trustStorePassword.get());
        }
        if (this.keyStoreLocation.isPresent() && this.keyStorePassword.isPresent() && this.keyStoreType.isPresent()) {
            properties.putIfAbsent("ssl.keystore.type", this.keyStoreType.get());
            properties.putIfAbsent("ssl.keystore.location", this.keyStoreLocation.get());
            properties.putIfAbsent("ssl.keystore.password", this.keyStorePassword.get());
            this.keyPassword.ifPresent(str2 -> {
                properties.putIfAbsent("ssl.key.password", str2);
            });
        }
    }
}
