package io.streamthoughts.jikkou.kafka.connect.reconciler;

import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.exceptions.ConfigException;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionOptionSpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionSpec;
import io.streamthoughts.jikkou.core.models.ResourceListObject;
import io.streamthoughts.jikkou.core.reconciler.Collector;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectExtensionConfig;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.collections.V1KafkaConnectorList;
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import io.streamthoughts.jikkou.kafka.connect.service.KafkaConnectClusterService;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SupportedResource(type = V1KafkaConnector.class)
@ExtensionSpec(options = {@ExtensionOptionSpec(name = KafkaConnectorCollector.EXPAND_STATUS_CONFIG, description = "Retrieves additional information about the status of the connector and its tasks.", type = Boolean.class, defaultValue = "false"), @ExtensionOptionSpec(name = "connect-cluster", description = "List of Kafka Connect cluster from which to list connectors.", type = List.class)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.class */
public final class KafkaConnectorCollector extends ContextualExtension implements Collector<V1KafkaConnector> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaConnectorCollector.class);
    public static final String EXPAND_STATUS_CONFIG = "expand-status";
    public static final String CONNECT_CLUSTER_CONFIG = "connect-cluster";
    private KafkaConnectExtensionConfig configuration;

    @Override // io.streamthoughts.jikkou.core.extension.ContextualExtension, io.streamthoughts.jikkou.core.extension.Extension
    public void init(@NotNull ExtensionContext extensionContext) throws ConfigException {
        super.init(extensionContext);
        init(new KafkaConnectExtensionConfig(extensionContext.appConfiguration()));
    }

    public void init(@NotNull KafkaConnectExtensionConfig kafkaConnectExtensionConfig) {
        this.configuration = kafkaConnectExtensionConfig;
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Collector
    public ResourceListObject<V1KafkaConnector> listAll(@NotNull Configuration configuration, @NotNull Selector selector) {
        Boolean bool = (Boolean) extensionContext().configProperty(EXPAND_STATUS_CONFIG).get(configuration);
        return new V1KafkaConnectorList((List) ((Set) extensionContext().configProperty("connect-cluster").getOptional(configuration).map(list -> {
            return new HashSet(list);
        }).orElseGet(() -> {
            return this.configuration.getClusters();
        })).stream().flatMap(str -> {
            return listAll(str, bool.booleanValue()).stream();
        }).collect(Collectors.toList()));
    }

    public List<V1KafkaConnector> listAll(String str, boolean z) {
        LinkedList linkedList = new LinkedList();
        KafkaConnectApi create = KafkaConnectApiFactory.create(this.configuration.getConfigForCluster(str).orElseThrow(() -> {
            return new KafkaConnectClusterNotFoundException("No connect cluster configured for name '" + str + "'");
        }));
        try {
            for (String str2 : create.listConnectors()) {
                try {
                    linkedList.add(new KafkaConnectClusterService(str, create).getConnectorAsync(str2, z).get());
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    LOG.error("Failed to get connector '{}' from connect cluster {}", str2, str, e);
                }
            }
            return linkedList;
        } finally {
            create.close();
        }
    }
}
