package ai.superstream.core;

import ai.superstream.model.MetadataMessage;
import ai.superstream.util.SuperstreamLogger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/* loaded from: input_file:ai/superstream/core/SuperstreamManager.class */
public class SuperstreamManager {
    private static final String TOPICS_ENV_VAR = "SUPERSTREAM_TOPICS_LIST";
    private static final String DISABLED_ENV_VAR = "SUPERSTREAM_DISABLED";
    private static volatile SuperstreamManager instance;
    private final MetadataConsumer metadataConsumer = new MetadataConsumer();
    private final ClientReporter clientReporter = new ClientReporter();
    private final ConfigurationOptimizer configurationOptimizer = new ConfigurationOptimizer();
    private final Map<String, MetadataMessage> metadataCache = new ConcurrentHashMap();
    private final boolean disabled = Boolean.parseBoolean(System.getenv(DISABLED_ENV_VAR));
    private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(SuperstreamManager.class);
    private static final ThreadLocal<Boolean> OPTIMIZATION_IN_PROGRESS = new ThreadLocal<>();

    private SuperstreamManager() {
        if (this.disabled) {
            logger.info("Superstream optimization is disabled via environment variable");
        }
    }

    public static boolean isOptimizationInProgress() {
        return Boolean.TRUE.equals(OPTIMIZATION_IN_PROGRESS.get());
    }

    public static void setOptimizationInProgress(boolean z) {
        if (z) {
            OPTIMIZATION_IN_PROGRESS.set(Boolean.TRUE);
        } else {
            OPTIMIZATION_IN_PROGRESS.remove();
        }
    }

    public static SuperstreamManager getInstance() {
        if (instance == null) {
            synchronized (SuperstreamManager.class) {
                if (instance == null) {
                    instance = new SuperstreamManager();
                }
            }
        }
        return instance;
    }

    public static Map<String, Object> convertPropertiesToMap(Properties properties) {
        HashMap hashMap = new HashMap();
        for (String str : properties.stringPropertyNames()) {
            hashMap.put(str, properties.getProperty(str));
        }
        return hashMap;
    }

    public boolean optimizeProducer(String str, String str2, Properties properties) {
        if (this.disabled) {
            return false;
        }
        try {
            if (isOptimizationInProgress()) {
                logger.debug("Skipping optimization for producer {} as optimization is already in progress", str2);
                return false;
            }
            try {
                setOptimizationInProgress(true);
                MetadataMessage orFetchMetadataMessage = getOrFetchMetadataMessage(str, properties);
                if (orFetchMetadataMessage == null) {
                    logger.warn("No metadata message available for {}, skipping optimization", str);
                    setOptimizationInProgress(false);
                    return false;
                }
                if (!orFetchMetadataMessage.isActive()) {
                    logger.info("Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it.");
                    setOptimizationInProgress(false);
                    return false;
                }
                Map<String, Object> optimalConfiguration = this.configurationOptimizer.getOptimalConfiguration(orFetchMetadataMessage, getApplicationTopics());
                Properties properties2 = new Properties();
                properties2.putAll(properties);
                List<String> applyOptimalConfiguration = this.configurationOptimizer.applyOptimalConfiguration(properties, optimalConfiguration);
                if (applyOptimalConfiguration.isEmpty()) {
                    logger.info("No configuration parameters were modified");
                    reportClientInformation(str, properties, orFetchMetadataMessage, str2, properties2, Collections.emptyMap());
                    setOptimizationInProgress(false);
                    return false;
                }
                HashMap hashMap = new HashMap();
                for (String str3 : applyOptimalConfiguration) {
                    hashMap.put(str3, properties.get(str3));
                }
                reportClientInformation(str, properties, orFetchMetadataMessage, str2, properties2, hashMap);
                logger.info("Successfully optimized producer configuration for {}", str2);
                setOptimizationInProgress(false);
                return true;
            } catch (Exception e) {
                logger.error("Failed to optimize producer configuration", e);
                setOptimizationInProgress(false);
                return false;
            }
        } catch (Throwable th) {
            setOptimizationInProgress(false);
            throw th;
        }
    }

    private MetadataMessage getOrFetchMetadataMessage(String str, Properties properties) {
        if (this.metadataCache.containsKey(str)) {
            return this.metadataCache.get(str);
        }
        MetadataMessage metadataMessage = this.metadataConsumer.getMetadataMessage(str, properties);
        if (metadataMessage != null) {
            this.metadataCache.put(str, metadataMessage);
        }
        return metadataMessage;
    }

    private List<String> getApplicationTopics() {
        String str = System.getenv(TOPICS_ENV_VAR);
        return (str == null || str.trim().isEmpty()) ? Collections.emptyList() : (List) Arrays.stream(str.split(",")).map((v0) -> {
            return v0.trim();
        }).filter(str2 -> {
            return !str2.isEmpty();
        }).collect(Collectors.toList());
    }

    private void reportClientInformation(String str, Properties properties, MetadataMessage metadataMessage, String str2, Properties properties2, Map<String, Object> map) {
        try {
            if (!this.clientReporter.reportClient(str, properties, metadataMessage.getSuperstreamClusterId(), metadataMessage.isActive(), str2, convertPropertiesToMap(properties2), map)) {
                logger.warn("Failed to report client information to the superstream.clients topic");
            }
        } catch (Exception e) {
            logger.error("Error reporting client information", e);
        }
    }
}
