package io.streamthoughts.jikkou.kafka.reconciler;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.SupportedResources;
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.models.Configs;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.ChangeHandler;
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
import io.streamthoughts.jikkou.core.reconciler.Controller;
import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.core.selector.Selectors;
import io.streamthoughts.jikkou.kafka.ApiVersions;
import io.streamthoughts.jikkou.kafka.change.topics.CreateTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.change.topics.DeleteTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.change.topics.TopicChange;
import io.streamthoughts.jikkou.kafka.change.topics.TopicChangeComputer;
import io.streamthoughts.jikkou.kafka.change.topics.UpdateTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicSpec;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ControllerConfiguration(supportedModes = {ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.FULL})
@SupportedResources({@SupportedResource(type = V1KafkaTopic.class), @SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA2, kind = "KafkaTopicChange")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/AdminClientKafkaTopicController.class */
public final class AdminClientKafkaTopicController extends ContextualExtension implements Controller<V1KafkaTopic, ResourceChange> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AdminClientKafkaTopicController.class);
    public static final String CONFIG_ENTRY_DELETE_ORPHANS_CONFIG_NAME = "config-delete-orphans";
    private AdminClientContextFactory adminClientContextFactory;

    public AdminClientKafkaTopicController() {
    }

    public AdminClientKafkaTopicController(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    @Override // io.streamthoughts.jikkou.core.extension.ContextualExtension, io.streamthoughts.jikkou.core.extension.Extension
    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(extensionContext.appConfiguration());
        }
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ChangeResult> execute(@NotNull ChangeExecutor<ResourceChange> changeExecutor, @NotNull ReconciliationContext reconciliationContext) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            AdminClient adminClient = createAdminClientContext.getAdminClient();
            List<ChangeResult> applyChanges = changeExecutor.applyChanges(List.of(new CreateTopicChangeHandler(adminClient), new UpdateTopicChangeHandler(adminClient), new DeleteTopicChangeHandler(adminClient), new ChangeHandler.None(TopicChange::getDescription)));
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return applyChanges;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ResourceChange> plan(@NotNull Collection<V1KafkaTopic> collection, @NotNull ReconciliationContext reconciliationContext) {
        LOG.info("Computing reconciliation change for '{}' resources", Integer.valueOf(collection.size()));
        Stream<V1KafkaTopic> stream = collection.stream();
        Selector selector = reconciliationContext.selector();
        Objects.requireNonNull(selector);
        List list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).map(v1KafkaTopic -> {
            V1KafkaTopicSpec spec2 = v1KafkaTopic.getSpec2();
            Configs configs = spec2.getConfigs();
            return configs == null ? v1KafkaTopic : v1KafkaTopic.withSpec(spec2.withConfigs(configs.flatten()));
        }).toList();
        Configuration from = Configuration.from((Map<String, ?>) Map.of(WithKafkaConfigFilters.DEFAULT_CONFIGS_CONFIG, true, WithKafkaConfigFilters.DYNAMIC_BROKER_CONFIGS_CONFIG, true, WithKafkaConfigFilters.STATIC_BROKER_CONFIGS_CONFIG, true));
        AdminClientKafkaTopicCollector adminClientKafkaTopicCollector = new AdminClientKafkaTopicCollector(this.adminClientContextFactory);
        adminClientKafkaTopicCollector.init(extensionContext().contextForExtension(AdminClientKafkaTopicCollector.class));
        Stream<E> stream2 = adminClientKafkaTopicCollector.listAll(from, Selectors.NO_SELECTOR).stream();
        Selector selector2 = reconciliationContext.selector();
        Objects.requireNonNull(selector2);
        return new TopicChangeComputer(isConfigDeletionEnabled(reconciliationContext)).computeChanges(stream2.filter((v1) -> {
            return r1.apply(v1);
        }).toList(), list);
    }

    @VisibleForTesting
    static boolean isConfigDeletionEnabled(@NotNull ReconciliationContext reconciliationContext) {
        return ConfigProperty.ofBoolean("config-delete-orphans").orElse((ConfigProperty<Boolean>) true).get(reconciliationContext.configuration()).booleanValue();
    }
}
