package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceCollector;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.error.JikkouRuntimeException;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.kafka.MetadataAnnotations;
import io.streamthoughts.jikkou.kafka.adapters.KafkaAclBindingAdapter;
import io.streamthoughts.jikkou.kafka.adapters.V1KafkaPrincipalAuthorizationSupport;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaPrincipalAuthorization;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResource(type = V1KafkaPrincipalAuthorization.class)
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaAclCollector.class */
public final class AdminClientKafkaAclCollector implements ResourceCollector<V1KafkaPrincipalAuthorization> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaAclCollector.class);
    private AdminClientContextFactory adminClientContextFactory;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaAclCollector$KafkaAclsClient.class */
    public static final class KafkaAclsClient {
        private final AdminClient client;

        public KafkaAclsClient(@NotNull AdminClient adminClient) {
            this.client = adminClient;
        }

        public Collection<V1KafkaPrincipalAuthorization> listAll() {
            try {
                return V1KafkaPrincipalAuthorizationSupport.from(describeAsync().get().stream().map(KafkaAclBindingAdapter::fromAclBinding).toList()).toList();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new JikkouRuntimeException("Failed to describe ACL due to thread-interruption", e);
            } catch (ExecutionException e2) {
                throw new JikkouRuntimeException("Failed to describe ACL due to execution error", e2);
            }
        }

        private CompletableFuture<Collection<AclBinding>> describeAsync() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return (Collection) this.client.describeAcls(AclBindingFilter.ANY).values().get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException("Failed to describe ACL due to thread-interruption", e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException("Failed to describe ACL due to execution error", e2);
                }
            });
        }
    }

    public AdminClientKafkaAclCollector() {
    }

    public AdminClientKafkaAclCollector(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(configuration);
        }
    }

    public List<V1KafkaPrincipalAuthorization> listAll(@NotNull Configuration configuration, @NotNull List<ResourceSelector> list) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            Stream<V1KafkaPrincipalAuthorization> stream = listAll(createAdminClientContext.getAdminClient()).stream();
            AggregateSelector aggregateSelector = new AggregateSelector(list);
            List<V1KafkaPrincipalAuthorization> list2 = stream.filter((v1) -> {
                return r1.apply(v1);
            }).toList();
            String clusterId = createAdminClientContext.getClusterId();
            List<V1KafkaPrincipalAuthorization> list3 = list2.stream().map(v1KafkaPrincipalAuthorization -> {
                return v1KafkaPrincipalAuthorization.toBuilder().withMetadata(v1KafkaPrincipalAuthorization.getMetadata().toBuilder().withAnnotation(MetadataAnnotations.JIKKOU_IO_KAFKA_CLUSTER_ID, clusterId).build()).build();
            }).toList();
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return list3;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<V1KafkaPrincipalAuthorization> listAll(@NotNull AdminClient adminClient) {
        return new ArrayList(new KafkaAclsClient(adminClient).listAll());
    }
}
