package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.annotations.SupportedReconciliationModes;
import io.streamthoughts.jikkou.api.annotations.SupportedResource;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseExternalResourceController;
import io.streamthoughts.jikkou.api.control.ChangeExecutor;
import io.streamthoughts.jikkou.api.control.ChangeHandler;
import io.streamthoughts.jikkou.api.control.ChangeResult;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.AdminClientContext;
import io.streamthoughts.jikkou.kafka.control.change.AclChange;
import io.streamthoughts.jikkou.kafka.control.change.AclChangeComputer;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.AclChangeDescription;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.CreateAclChangeHandler;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.DeleteAclChangeHandler;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.KafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.builder.LiteralKafkaAclBindingBuilder;
import io.streamthoughts.jikkou.kafka.control.handlers.acls.builder.TopicMatchingAclRulesBuilder;
import io.streamthoughts.jikkou.kafka.models.V1KafkaAclChange;
import io.streamthoughts.jikkou.kafka.models.V1KafkaAclChangeList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaPrincipalAuthorization;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;

@SupportedReconciliationModes(modes = {ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.APPLY_ALL})
@SupportedResource(type = V1KafkaPrincipalAuthorization.class)
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaAclController.class */
public final class AdminClientKafkaAclController extends AbstractAdminClientKafkaController implements BaseExternalResourceController<V1KafkaPrincipalAuthorization, AclChange> {
    private AdminClientKafkaAclCollector descriptor;

    public AdminClientKafkaAclController() {
    }

    public AdminClientKafkaAclController(@NotNull Configuration configuration) {
        super(configuration);
    }

    public AdminClientKafkaAclController(@NotNull AdminClientContext adminClientContext) {
        super(adminClientContext);
    }

    @Override // io.streamthoughts.jikkou.kafka.control.AbstractAdminClientKafkaController
    public void configure(@NotNull Configuration configuration) throws ConfigException {
        super.configure(configuration);
        this.descriptor = new AdminClientKafkaAclCollector(this.adminClientContext);
    }

    public V1KafkaAclChangeList computeReconciliationChanges(@NotNull Collection<V1KafkaPrincipalAuthorization> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        AdminClient client = this.adminClientContext.client();
        Stream<V1KafkaPrincipalAuthorization> stream = collection.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        List<V1KafkaPrincipalAuthorization> list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        Stream<V1KafkaPrincipalAuthorization> stream2 = this.descriptor.listAll(client).stream();
        AggregateSelector aggregateSelector2 = new AggregateSelector(reconciliationContext.selectors());
        return new V1KafkaAclChangeList().withItems(new AclChangeComputer(KafkaAclBindingBuilder.combines(new LiteralKafkaAclBindingBuilder(), new TopicMatchingAclRulesBuilder(client))).computeChanges(stream2.filter((v1) -> {
            return r1.apply(v1);
        }).toList(), list).stream().map(this::toModelChange).toList());
    }

    private V1KafkaAclChange toModelChange(AclChange aclChange) {
        return V1KafkaAclChange.builder().withChange(aclChange).build();
    }

    public List<ChangeResult<AclChange>> execute(@NotNull List<AclChange> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        AdminClient client = this.adminClientContext.client();
        return new ChangeExecutor(List.of(new CreateAclChangeHandler(client), new DeleteAclChangeHandler(client), new ChangeHandler.None(AclChangeDescription::new))).execute(list, z);
    }

    /* renamed from: computeReconciliationChanges, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ResourceListObject m3computeReconciliationChanges(@NotNull Collection collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return computeReconciliationChanges((Collection<V1KafkaPrincipalAuthorization>) collection, reconciliationMode, reconciliationContext);
    }
}
