package io.streamthoughts.jikkou.kafka.reconciler;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.SupportedResources;
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.ChangeHandler;
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
import io.streamthoughts.jikkou.core.reconciler.Controller;
import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.kafka.ApiVersions;
import io.streamthoughts.jikkou.kafka.change.acl.AclChangeComputer;
import io.streamthoughts.jikkou.kafka.change.acl.AclChangeHandler;
import io.streamthoughts.jikkou.kafka.change.acl.KafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.change.acl.KafkaPrincipalAuthorizationDescription;
import io.streamthoughts.jikkou.kafka.change.acl.builder.LiteralKafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.change.acl.builder.TopicMatchingAclRulesBuilder;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaPrincipalAuthorization;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;

@ControllerConfiguration(supportedModes = {ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.FULL})
@SupportedResources({@SupportedResource(type = V1KafkaPrincipalAuthorization.class), @SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA2, kind = "KafkaPrincipalAuthorizationChange")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/AdminClientKafkaAclController.class */
public final class AdminClientKafkaAclController extends ContextualExtension implements Controller<V1KafkaPrincipalAuthorization, ResourceChange> {
    private AdminClientContextFactory adminClientContextFactory;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/AdminClientKafkaAclController$Config.class */
    public static class Config {
        public static final ConfigProperty<Boolean> DELETE_ORPHANS_OPTIONS_CONFIG = ConfigProperty.ofBoolean("delete-orphans").orElse(false);
        private final Configuration configuration;

        public Config(Configuration configuration) {
            this.configuration = configuration;
        }

        public boolean isDeleteOrphansEnabled() {
            return ((Boolean) DELETE_ORPHANS_OPTIONS_CONFIG.get(this.configuration)).booleanValue();
        }
    }

    public AdminClientKafkaAclController() {
    }

    public AdminClientKafkaAclController(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(extensionContext.appConfiguration());
        }
    }

    public List<ResourceChange> plan(@NotNull Collection<V1KafkaPrincipalAuthorization> collection, @NotNull ReconciliationContext reconciliationContext) {
        Stream<V1KafkaPrincipalAuthorization> stream = collection.stream();
        Selector selector = reconciliationContext.selector();
        Objects.requireNonNull(selector);
        List<V1KafkaPrincipalAuthorization> list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            AdminClient adminClient = createAdminClientContext.getAdminClient();
            AdminClientKafkaAclCollector adminClientKafkaAclCollector = new AdminClientKafkaAclCollector(this.adminClientContextFactory);
            adminClientKafkaAclCollector.init(extensionContext().contextForExtension(AdminClientKafkaAclCollector.class));
            Stream<V1KafkaPrincipalAuthorization> stream2 = adminClientKafkaAclCollector.listAll(adminClient).stream();
            Selector selector2 = reconciliationContext.selector();
            Objects.requireNonNull(selector2);
            List<ResourceChange> computeChanges = new AclChangeComputer(new Config(reconciliationContext.configuration()).isDeleteOrphansEnabled(), KafkaAclBindingBuilder.combines(new LiteralKafkaAclBindingBuilder(), new TopicMatchingAclRulesBuilder(adminClient))).computeChanges(stream2.filter((v1) -> {
                return r1.apply(v1);
            }).toList(), list);
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return computeChanges;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<ChangeResult> execute(@NotNull ChangeExecutor<ResourceChange> changeExecutor, @NotNull ReconciliationContext reconciliationContext) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            List<ChangeResult> applyChanges = changeExecutor.applyChanges(List.of(new AclChangeHandler(createAdminClientContext.getAdminClient()), new ChangeHandler.None(KafkaPrincipalAuthorizationDescription::new)));
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return applyChanges;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
