package org.voltdb.stream.api.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.voltdb.stream.api.extension.VoltStreamSinkConfigurator;
import org.voltdb.stream.api.pipeline.ExceptionHandler;
import org.voltdb.stream.execution.Configuration;
import org.voltdb.stream.execution.Property;

/* loaded from: input_file:org/voltdb/stream/api/kafka/KafkaStreamSinkConfigurator.class */
public class KafkaStreamSinkConfigurator<T> implements VoltStreamSinkConfigurator<T> {
    private static final String DEFAULT_CONFIG_PATH = "sink.kafka";
    private String topicName;
    private String bootstrapServers;
    private String valueSerializer;
    private String keySerializer;
    private ExceptionHandler exceptionHandler;
    private Function<Object, Object> keyExtractorFunction = obj -> {
        return null;
    };
    private Function<Object, Object> valueExtractorFunction = Function.identity();
    private Function<Object, Iterable<Header>> headerExtractorFunction = obj -> {
        return List.of();
    };
    private final Properties properties = new Properties();

    public static <P> KafkaStreamSinkConfigurator<P> accepting(Class<P> cls) {
        return new KafkaStreamSinkConfigurator<>();
    }

    private KafkaStreamSinkConfigurator() {
        autoConfigureBuilder();
    }

    public KafkaStreamSinkConfigurator<T> withTopicName(String str) {
        this.topicName = Property.extractSafe(str);
        return this;
    }

    public KafkaStreamSinkConfigurator<T> withBootstrapServers(String str) {
        this.bootstrapServers = Property.extractSafe(str);
        return this;
    }

    public KafkaStreamSinkConfigurator<T> withSSL(KafkaStreamSslConfiguration kafkaStreamSslConfiguration) {
        this.properties.putAll(kafkaStreamSslConfiguration.getProperties());
        return this;
    }

    public KafkaStreamSinkConfigurator<T> withProperty(String str, String str2) {
        this.properties.setProperty(str, str2);
        return this;
    }

    public KafkaStreamSinkConfigurator<T> withProperty(String str, int i) {
        return withProperty(str, String.valueOf(i));
    }

    public KafkaStreamSinkConfigurator<T> withProperty(String str, long j) {
        return withProperty(str, String.valueOf(j));
    }

    public KafkaStreamSinkConfigurator<T> withProperty(String str, boolean z) {
        return withProperty(str, String.valueOf(z));
    }

    public KafkaStreamSinkConfigurator<T> withValueSerializer(Class<? extends Serializer<?>> cls) {
        return withValueSerializer(cls.getName());
    }

    public KafkaStreamSinkConfigurator<T> withValueSerializer(String str) {
        this.valueSerializer = str;
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KafkaStreamSinkConfigurator<T> withKeyExtractor(Function<T, Long> function) {
        return withKeyExtractor(function, LongSerializer.class);
    }

    public <K> KafkaStreamSinkConfigurator<T> withKeyExtractor(Function<T, K> function, Class<? extends Serializer<K>> cls) {
        this.keyExtractorFunction = obj -> {
            return function.apply(obj);
        };
        this.keySerializer = cls.getName();
        return this;
    }

    public <K> KafkaStreamSinkConfigurator<T> withValueExtractor(Function<T, K> function, Class<? extends Serializer<?>> cls) {
        this.valueExtractorFunction = obj -> {
            return function.apply(obj);
        };
        return withValueSerializer(cls);
    }

    public <K> KafkaStreamSinkConfigurator<T> withHeaders(Function<T, Iterable<Header>> function) {
        this.headerExtractorFunction = obj -> {
            return (Iterable) function.apply(obj);
        };
        return this;
    }

    public KafkaStreamSinkConfigurator<T> withSchemaRegistry(String str) {
        return withProperty("schema.registry.url", Property.extractSafe(str));
    }

    public VoltStreamSinkConfigurator<T> withExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    String getTopicName() {
        return this.topicName;
    }

    String getBootstrapServers() {
        return this.bootstrapServers;
    }

    String getValueSerializer() {
        return this.valueSerializer;
    }

    String getKeySerializer() {
        return this.keySerializer;
    }

    Object extractKey(Object obj) {
        return this.keyExtractorFunction.apply(obj);
    }

    Object extractValue(Object obj) {
        return this.valueExtractorFunction.apply(obj);
    }

    Iterable<Header> extractHeader(Object obj) {
        return this.headerExtractorFunction.apply(obj);
    }

    Properties getProperties() {
        return this.properties;
    }

    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandler;
    }

    public KafkaStreamSinkConfigurator<T> configure(String str) {
        if (!getConfiguration().findByPath(str, new String[0]).hasValue()) {
            throw new IllegalArgumentException("No configuration found for path: " + str + ", check your helm configuration");
        }
        configureBuilder(str);
        return this;
    }

    private void autoConfigureBuilder() {
        if (getConfiguration().findByPath(DEFAULT_CONFIG_PATH, new String[0]).hasValue()) {
            configureBuilder(DEFAULT_CONFIG_PATH);
        }
    }

    private void configureBuilder(String str) {
        Configuration configuration = getConfiguration();
        this.topicName = (String) configuration.findByPath(str, new String[]{"topicName"}).orElse((Object) null);
        this.bootstrapServers = (String) configuration.findByPath(str, new String[]{"bootstrapServers"}).orElse((Object) null);
        Configuration.ConfigurationPart findByPath = configuration.findByPath(str, new String[]{"schemaRegistry"});
        if (findByPath.hasValue()) {
            withSchemaRegistry(findByPath.asString());
        }
        this.properties.putAll((Map) configuration.findByPath(str, new String[]{"properties"}).orElse(Map.of()));
        withSSL(KafkaStreamSslConfiguration.create(configuration, str));
    }
}
