package org.apache.pulsar.functions.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-utils-2.10.0-rc-202202072210.jar:org/apache/pulsar/functions/utils/SourceConfigUtils.class */
public class SourceConfigUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SourceConfigUtils.class);

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-utils-2.10.0-rc-202202072210.jar:org/apache/pulsar/functions/utils/SourceConfigUtils$ExtractedSourceDetails.class */
    public static class ExtractedSourceDetails {
        private String sourceClassName;
        private String typeArg;

        public String getSourceClassName() {
            return this.sourceClassName;
        }

        public String getTypeArg() {
            return this.typeArg;
        }

        public void setSourceClassName(String str) {
            this.sourceClassName = str;
        }

        public void setTypeArg(String str) {
            this.typeArg = str;
        }

        public ExtractedSourceDetails(String str, String str2) {
            this.sourceClassName = str;
            this.typeArg = str2;
        }
    }

    public static Function.FunctionDetails convert(SourceConfig sourceConfig, ExtractedSourceDetails extractedSourceDetails) throws IllegalArgumentException {
        Function.FunctionDetails.Builder newBuilder = Function.FunctionDetails.newBuilder();
        boolean z = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN);
        if (sourceConfig.getTenant() != null) {
            newBuilder.setTenant(sourceConfig.getTenant());
        }
        if (sourceConfig.getNamespace() != null) {
            newBuilder.setNamespace(sourceConfig.getNamespace());
        }
        if (sourceConfig.getName() != null) {
            newBuilder.setName(sourceConfig.getName());
        }
        newBuilder.setRuntime(Function.FunctionDetails.Runtime.JAVA);
        if (sourceConfig.getParallelism() != null) {
            newBuilder.setParallelism(sourceConfig.getParallelism().intValue());
        } else {
            newBuilder.setParallelism(1);
        }
        newBuilder.setClassName(IdentityFunction.class.getName());
        newBuilder.setAutoAck(true);
        if (sourceConfig.getProcessingGuarantees() != null) {
            newBuilder.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
        }
        Function.SourceSpec.Builder newBuilder2 = Function.SourceSpec.newBuilder();
        if (extractedSourceDetails.getSourceClassName() != null) {
            newBuilder2.setClassName(extractedSourceDetails.getSourceClassName());
        }
        if (z) {
            newBuilder2.setBuiltin(sourceConfig.getArchive().replaceFirst("^builtin://", ""));
        }
        HashMap hashMap = new HashMap();
        if (sourceConfig.getConfigs() != null) {
            hashMap.putAll(sourceConfig.getConfigs());
        }
        if (sourceConfig.getBatchSourceConfig() != null) {
            hashMap.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(sourceConfig.getBatchSourceConfig()));
            hashMap.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, newBuilder2.getClassName());
            newBuilder2.setClassName("org.apache.pulsar.functions.source.batch.BatchSourceExecutor");
        }
        newBuilder2.setConfigs(new Gson().toJson(hashMap));
        if (sourceConfig.getSecrets() != null && !sourceConfig.getSecrets().isEmpty()) {
            newBuilder.setSecretsMap(new Gson().toJson(sourceConfig.getSecrets()));
        }
        if (extractedSourceDetails.getTypeArg() != null) {
            newBuilder2.setTypeClassName(extractedSourceDetails.getTypeArg());
        }
        newBuilder.setSource(newBuilder2);
        Function.SinkSpec.Builder newBuilder3 = Function.SinkSpec.newBuilder();
        if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
            newBuilder3.setSchemaType(sourceConfig.getSchemaType());
        }
        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
            newBuilder3.setSerDeClassName(sourceConfig.getSerdeClassName());
        }
        newBuilder3.setTopic(sourceConfig.getTopicName());
        if (extractedSourceDetails.getTypeArg() != null) {
            newBuilder3.setTypeClassName(extractedSourceDetails.getTypeArg());
        }
        if (sourceConfig.getProducerConfig() != null) {
            ProducerConfig producerConfig = sourceConfig.getProducerConfig();
            Function.ProducerSpec.Builder newBuilder4 = Function.ProducerSpec.newBuilder();
            if (producerConfig.getMaxPendingMessages() != null) {
                newBuilder4.setMaxPendingMessages(producerConfig.getMaxPendingMessages().intValue());
            }
            if (producerConfig.getMaxPendingMessagesAcrossPartitions() != null) {
                newBuilder4.setMaxPendingMessagesAcrossPartitions(producerConfig.getMaxPendingMessagesAcrossPartitions().intValue());
            }
            if (producerConfig.getUseThreadLocalProducers() != null) {
                newBuilder4.setUseThreadLocalProducers(producerConfig.getUseThreadLocalProducers().booleanValue());
            }
            if (producerConfig.getCryptoConfig() != null) {
                newBuilder4.setCryptoSpec(CryptoUtils.convert(producerConfig.getCryptoConfig()));
            }
            if (producerConfig.getBatchBuilder() != null) {
                newBuilder4.setBatchBuilder(producerConfig.getBatchBuilder());
            }
            newBuilder3.setProducerSpec(newBuilder4.build());
        }
        if (sourceConfig.getBatchBuilder() != null) {
            newBuilder3.setProducerSpec((newBuilder3.getProducerSpec() != null ? newBuilder3.getProducerSpec().toBuilder() : Function.ProducerSpec.newBuilder()).setBatchBuilder(sourceConfig.getBatchBuilder()).build());
        }
        newBuilder3.setForwardSourceMessageProperty(true);
        newBuilder.setSink(newBuilder3);
        Resources mergeWithDefault = Resources.mergeWithDefault(sourceConfig.getResources());
        Function.Resources.Builder newBuilder5 = Function.Resources.newBuilder();
        newBuilder5.setCpu(mergeWithDefault.getCpu().doubleValue());
        newBuilder5.setRam(mergeWithDefault.getRam().longValue());
        newBuilder5.setDisk(mergeWithDefault.getDisk().longValue());
        newBuilder.setResources(newBuilder5);
        if (!StringUtils.isEmpty(sourceConfig.getRuntimeFlags())) {
            newBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
        }
        newBuilder.setComponentType(Function.FunctionDetails.ComponentType.SOURCE);
        if (!StringUtils.isEmpty(sourceConfig.getCustomRuntimeOptions())) {
            newBuilder.setCustomRuntimeOptions(sourceConfig.getCustomRuntimeOptions());
        }
        return newBuilder.build();
    }

    public static SourceConfig convertFromDetails(Function.FunctionDetails functionDetails) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(functionDetails.getTenant());
        sourceConfig.setNamespace(functionDetails.getNamespace());
        sourceConfig.setName(functionDetails.getName());
        sourceConfig.setParallelism(Integer.valueOf(functionDetails.getParallelism()));
        sourceConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
        Function.SourceSpec source = functionDetails.getSource();
        if (!StringUtils.isEmpty(source.getClassName())) {
            sourceConfig.setClassName(source.getClassName());
        }
        if (!StringUtils.isEmpty(source.getBuiltin())) {
            sourceConfig.setArchive("builtin://" + source.getBuiltin());
        }
        Map<String, Object> extractSourceConfig = extractSourceConfig(source, FunctionCommon.getFullyQualifiedName(functionDetails));
        if (extractSourceConfig != null) {
            BatchSourceConfig extractBatchSourceConfig = extractBatchSourceConfig(extractSourceConfig);
            if (extractBatchSourceConfig != null) {
                sourceConfig.setBatchSourceConfig(extractBatchSourceConfig);
                if (extractSourceConfig.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
                    if (StringUtils.isEmpty((String) extractSourceConfig.get(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY))) {
                        sourceConfig.setClassName(null);
                    } else {
                        sourceConfig.setClassName((String) extractSourceConfig.get(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY));
                    }
                }
            }
            extractSourceConfig.remove(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY);
            extractSourceConfig.remove(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY);
            sourceConfig.setConfigs(extractSourceConfig);
        }
        if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
            sourceConfig.setSecrets((Map) new Gson().fromJson(functionDetails.getSecretsMap(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.utils.SourceConfigUtils.1
            }.getType()));
        }
        Function.SinkSpec sink = functionDetails.getSink();
        sourceConfig.setTopicName(sink.getTopic());
        if (!StringUtils.isEmpty(sink.getSchemaType())) {
            sourceConfig.setSchemaType(sink.getSchemaType());
        }
        if (!StringUtils.isEmpty(sink.getSerDeClassName())) {
            sourceConfig.setSerdeClassName(sink.getSerDeClassName());
        }
        if (sink.getProducerSpec() != null) {
            Function.ProducerSpec producerSpec = sink.getProducerSpec();
            ProducerConfig producerConfig = new ProducerConfig();
            if (producerSpec.getMaxPendingMessages() != 0) {
                producerConfig.setMaxPendingMessages(Integer.valueOf(producerSpec.getMaxPendingMessages()));
            }
            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
                producerConfig.setMaxPendingMessagesAcrossPartitions(Integer.valueOf(producerSpec.getMaxPendingMessagesAcrossPartitions()));
            }
            if (producerSpec.hasCryptoSpec()) {
                producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(producerSpec.getCryptoSpec()));
            }
            if (producerSpec.getBatchBuilder() != null) {
                producerConfig.setBatchBuilder(producerSpec.getBatchBuilder());
            }
            producerConfig.setUseThreadLocalProducers(Boolean.valueOf(producerSpec.getUseThreadLocalProducers()));
            sourceConfig.setProducerConfig(producerConfig);
        }
        if (functionDetails.hasResources()) {
            Resources resources = new Resources();
            resources.setCpu(Double.valueOf(functionDetails.getResources().getCpu()));
            resources.setRam(Long.valueOf(functionDetails.getResources().getRam()));
            resources.setDisk(Long.valueOf(functionDetails.getResources().getDisk()));
            sourceConfig.setResources(resources);
        }
        if (!StringUtils.isEmpty(functionDetails.getRuntimeFlags())) {
            sourceConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
        }
        if (!StringUtils.isEmpty(functionDetails.getCustomRuntimeOptions())) {
            sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
        }
        return sourceConfig;
    }

    public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sourceConfig, ClassLoader classLoader, boolean z) {
        if (StringUtils.isEmpty(sourceConfig.getTenant())) {
            throw new IllegalArgumentException("Source tenant cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getNamespace())) {
            throw new IllegalArgumentException("Source namespace cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getName())) {
            throw new IllegalArgumentException("Source name cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getTopicName())) {
            throw new IllegalArgumentException("Topic name cannot be null");
        }
        if (!TopicName.isValid(sourceConfig.getTopicName())) {
            throw new IllegalArgumentException("Topic name is invalid");
        }
        if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism().intValue() <= 0) {
            throw new IllegalArgumentException("Source parallelism must be a positive number");
        }
        if (sourceConfig.getResources() != null) {
            ResourceConfigUtils.validate(sourceConfig.getResources());
        }
        String className = sourceConfig.getClassName();
        if (className == null) {
            try {
                className = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
            } catch (IOException e) {
                throw new IllegalArgumentException("Failed to extract source class from archive", e);
            }
        }
        try {
            Class<?> loadClass = classLoader.loadClass(className);
            if (!Source.class.isAssignableFrom(loadClass) && !BatchSource.class.isAssignableFrom(loadClass)) {
                throw new IllegalArgumentException(String.format("Source class %s does not implement the correct interface", loadClass.getName()));
            }
            if (BatchSource.class.isAssignableFrom(loadClass)) {
                if (sourceConfig.getBatchSourceConfig() == null) {
                    throw new IllegalArgumentException(String.format("Source class %s implements %s but batch source source config is not specified", loadClass.getName(), BatchSource.class.getName()));
                }
                validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
            }
            Class<?> sourceType = FunctionCommon.getSourceType(loadClass);
            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
            }
            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
                ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), sourceType, classLoader, false);
            }
            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), sourceType, classLoader, false);
            }
            if (sourceConfig.getProducerConfig() != null && sourceConfig.getProducerConfig().getCryptoConfig() != null) {
                ValidatorUtils.validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(), classLoader, true);
            }
            if (sourceType.equals(TypeResolver.Unknown.class)) {
                throw new IllegalArgumentException(String.format("Failed to resolve type for Source class %s", className));
            }
            if (z && (classLoader instanceof NarClassLoader)) {
                validateSourceConfig(sourceConfig, (NarClassLoader) classLoader);
            }
            return new ExtractedSourceDetails(className, sourceType.getName());
        } catch (ClassNotFoundException e2) {
            throw new IllegalArgumentException(String.format("Source class %s not found in class loader", className), e2);
        }
    }

    public static SourceConfig clone(SourceConfig sourceConfig) {
        return (SourceConfig) ObjectMapperFactory.getThreadLocal().readValue(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(sourceConfig), SourceConfig.class);
    }

    public static SourceConfig validateUpdate(SourceConfig sourceConfig, SourceConfig sourceConfig2) {
        SourceConfig clone = clone(sourceConfig);
        if (!sourceConfig.getTenant().equals(sourceConfig2.getTenant())) {
            throw new IllegalArgumentException("Tenants differ");
        }
        if (!sourceConfig.getNamespace().equals(sourceConfig2.getNamespace())) {
            throw new IllegalArgumentException("Namespaces differ");
        }
        if (!sourceConfig.getName().equals(sourceConfig2.getName())) {
            throw new IllegalArgumentException("Function Names differ");
        }
        if (!StringUtils.isEmpty(sourceConfig2.getClassName())) {
            clone.setClassName(sourceConfig2.getClassName());
        }
        if (!StringUtils.isEmpty(sourceConfig2.getTopicName())) {
            clone.setTopicName(sourceConfig2.getTopicName());
        }
        if (!StringUtils.isEmpty(sourceConfig2.getSerdeClassName())) {
            clone.setSerdeClassName(sourceConfig2.getSerdeClassName());
        }
        if (!StringUtils.isEmpty(sourceConfig2.getSchemaType())) {
            clone.setSchemaType(sourceConfig2.getSchemaType());
        }
        if (sourceConfig2.getConfigs() != null) {
            clone.setConfigs(sourceConfig2.getConfigs());
        }
        if (sourceConfig2.getSecrets() != null) {
            clone.setSecrets(sourceConfig2.getSecrets());
        }
        if (sourceConfig2.getProcessingGuarantees() != null && !sourceConfig2.getProcessingGuarantees().equals(sourceConfig.getProcessingGuarantees())) {
            throw new IllegalArgumentException("Processing Guarantees cannot be altered");
        }
        if (sourceConfig2.getParallelism() != null) {
            clone.setParallelism(sourceConfig2.getParallelism());
        }
        if (sourceConfig2.getResources() != null) {
            clone.setResources(ResourceConfigUtils.merge(sourceConfig.getResources(), sourceConfig2.getResources()));
        }
        if (!StringUtils.isEmpty(sourceConfig2.getArchive())) {
            clone.setArchive(sourceConfig2.getArchive());
        }
        if (!StringUtils.isEmpty(sourceConfig2.getRuntimeFlags())) {
            clone.setRuntimeFlags(sourceConfig2.getRuntimeFlags());
        }
        if (!StringUtils.isEmpty(sourceConfig2.getCustomRuntimeOptions())) {
            clone.setCustomRuntimeOptions(sourceConfig2.getCustomRuntimeOptions());
        }
        if (isBatchSource(sourceConfig) != isBatchSource(sourceConfig2)) {
            throw new IllegalArgumentException("Sources cannot be update between regular sources and batchsource");
        }
        if (sourceConfig2.getBatchSourceConfig() != null) {
            validateBatchSourceConfigUpdate(sourceConfig.getBatchSourceConfig(), sourceConfig2.getBatchSourceConfig());
            clone.setBatchSourceConfig(sourceConfig2.getBatchSourceConfig());
        }
        return clone;
    }

    public static void validateBatchSourceConfig(BatchSourceConfig batchSourceConfig) throws IllegalArgumentException {
        if (StringUtils.isEmpty(batchSourceConfig.getDiscoveryTriggererClassName())) {
            log.error("BatchSourceConfig does not specify Discovery Trigger ClassName");
            throw new IllegalArgumentException("BatchSourceConfig does not specify Discovery Trigger ClassName");
        }
    }

    public static Map<String, Object> extractSourceConfig(Function.SourceSpec sourceSpec, String str) {
        if (StringUtils.isEmpty(sourceSpec.getConfigs())) {
            return null;
        }
        try {
            return (Map) ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(), new TypeReference<HashMap<String, Object>>() { // from class: org.apache.pulsar.functions.utils.SourceConfigUtils.2
            });
        } catch (IOException e) {
            log.error("Failed to read configs for source {}", str, e);
            throw new RuntimeException(e);
        }
    }

    public static BatchSourceConfig extractBatchSourceConfig(Map<String, Object> map) {
        if (!map.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY)) {
            return null;
        }
        return (BatchSourceConfig) new Gson().fromJson((String) map.get(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY), BatchSourceConfig.class);
    }

    public static Map<String, String> computeBatchSourceIntermediateTopicSubscriptions(Function.FunctionDetails functionDetails, String str) {
        Map<String, Object> extractSourceConfig = extractSourceConfig(functionDetails.getSource(), str);
        if (extractSourceConfig == null) {
            return null;
        }
        BatchSourceConfig extractBatchSourceConfig = extractBatchSourceConfig(extractSourceConfig);
        String topicName = computeBatchSourceIntermediateTopicName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()).toString();
        if (extractBatchSourceConfig == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(topicName, computeBatchSourceInstanceSubscriptionName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()));
        return hashMap;
    }

    public static String computeBatchSourceInstanceSubscriptionName(String str, String str2, String str3) {
        return "BatchSourceExecutor-" + str + "/" + str2 + "/" + str3;
    }

    public static TopicName computeBatchSourceIntermediateTopicName(String str, String str2, String str3) {
        return TopicName.get(TopicDomain.persistent.name(), str, str2, str3 + "-intermediate");
    }

    public static boolean isBatchSource(SourceConfig sourceConfig) {
        return sourceConfig.getBatchSourceConfig() != null;
    }

    public static void validateBatchSourceConfigUpdate(BatchSourceConfig batchSourceConfig, BatchSourceConfig batchSourceConfig2) {
        if (!batchSourceConfig.getDiscoveryTriggererClassName().equals(batchSourceConfig2.getDiscoveryTriggererClassName())) {
            throw new IllegalArgumentException("DiscoverTriggerer class cannot be updated for batchsources");
        }
    }

    public static void validateSourceConfig(SourceConfig sourceConfig, NarClassLoader narClassLoader) {
        try {
            ConnectorDefinition connectorDefinition = ConnectorUtils.getConnectorDefinition(narClassLoader);
            if (connectorDefinition.getSourceConfigClass() != null) {
                Object convertValue = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), Class.forName(connectorDefinition.getSourceConfigClass(), true, narClassLoader));
                if (convertValue != null) {
                    ConfigValidation.validateConfig(convertValue);
                }
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Error validating source config", e);
        } catch (ClassNotFoundException e2) {
            throw new IllegalArgumentException("Could not find source config class");
        } catch (IllegalArgumentException e3) {
            throw new IllegalArgumentException("Could not validate source config: " + e3.getMessage());
        }
    }
}
