package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsReconciliationModes;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.change.ChangeExecutor;
import io.streamthoughts.jikkou.api.change.ChangeHandler;
import io.streamthoughts.jikkou.api.change.ChangeResult;
import io.streamthoughts.jikkou.api.config.ConfigProperty;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseResourceController;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.change.AclChange;
import io.streamthoughts.jikkou.kafka.change.AclChangeComputer;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.AclChangeDescription;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.CreateAclChangeHandler;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.DeleteAclChangeHandler;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.KafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.builder.LiteralKafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.change.handlers.acls.builder.TopicMatchingAclRulesBuilder;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaAclChange;
import io.streamthoughts.jikkou.kafka.models.V1KafkaAclChangeList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaPrincipalAuthorization;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResource(type = V1KafkaPrincipalAuthorization.class)
@AcceptsReconciliationModes({ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.APPLY_ALL})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaAclController.class */
public final class AdminClientKafkaAclController implements BaseResourceController<V1KafkaPrincipalAuthorization, AclChange> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AdminClientKafkaAclController.class);
    public static final ConfigProperty<Boolean> DELETE_ORPHANS_OPTIONS = ConfigProperty.ofBoolean("delete-orphans").orElse((ConfigProperty<Boolean>) false);
    private AdminClientContextFactory adminClientContextFactory;

    public AdminClientKafkaAclController() {
    }

    public AdminClientKafkaAclController(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    @Override // io.streamthoughts.jikkou.api.config.Configurable
    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(configuration);
        }
    }

    @Override // io.streamthoughts.jikkou.api.control.ResourceController
    public V1KafkaAclChangeList computeReconciliationChanges(@NotNull Collection<V1KafkaPrincipalAuthorization> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        Stream<V1KafkaPrincipalAuthorization> stream = collection.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        List<V1KafkaPrincipalAuthorization> list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            AdminClient adminClient = createAdminClientContext.getAdminClient();
            Stream<V1KafkaPrincipalAuthorization> stream2 = new AdminClientKafkaAclCollector(this.adminClientContextFactory).listAll(adminClient).stream();
            AggregateSelector aggregateSelector2 = new AggregateSelector(reconciliationContext.selectors());
            V1KafkaAclChangeList withItems = new V1KafkaAclChangeList().withItems((List) new AclChangeComputer(DELETE_ORPHANS_OPTIONS.evaluate(reconciliationContext.configuration()).booleanValue(), KafkaAclBindingBuilder.combines(new LiteralKafkaAclBindingBuilder(), new TopicMatchingAclRulesBuilder(adminClient))).computeChanges(stream2.filter((v1) -> {
                return r1.apply(v1);
            }).toList(), list).stream().map(hasMetadataChange -> {
                return V1KafkaAclChange.builder().withMetadata(hasMetadataChange.getMetadata()).withChange((AclChange) hasMetadataChange.getChange()).build();
            }).collect(Collectors.toList()));
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return withItems;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private V1KafkaAclChange toModelChange(AclChange aclChange) {
        return V1KafkaAclChange.builder().withChange(aclChange).build();
    }

    @Override // io.streamthoughts.jikkou.api.control.BaseResourceController
    public List<ChangeResult<AclChange>> execute(@NotNull List<HasMetadataChange<AclChange>> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            AdminClient adminClient = createAdminClientContext.getAdminClient();
            List<ChangeResult<AclChange>> execute = new ChangeExecutor(List.of(new CreateAclChangeHandler(adminClient), new DeleteAclChangeHandler(adminClient), new ChangeHandler.None(AclChangeDescription::new))).execute(list, z);
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return execute;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.streamthoughts.jikkou.api.control.ResourceController
    public /* bridge */ /* synthetic */ ResourceListObject computeReconciliationChanges(@NotNull Collection collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return computeReconciliationChanges((Collection<V1KafkaPrincipalAuthorization>) collection, reconciliationMode, reconciliationContext);
    }
}
