package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsReconciliationModes;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.annotation.AcceptsResources;
import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.change.ChangeExecutor;
import io.streamthoughts.jikkou.api.change.ChangeHandler;
import io.streamthoughts.jikkou.api.change.ChangeResult;
import io.streamthoughts.jikkou.api.config.ConfigProperty;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseResourceController;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.change.QuotaChange;
import io.streamthoughts.jikkou.kafka.change.QuotaChangeComputer;
import io.streamthoughts.jikkou.kafka.change.handlers.quotas.CreateQuotasChangeHandlerKafka;
import io.streamthoughts.jikkou.kafka.change.handlers.quotas.DeleteQuotasChangeHandler;
import io.streamthoughts.jikkou.kafka.change.handlers.quotas.QuotaChangeDescription;
import io.streamthoughts.jikkou.kafka.change.handlers.quotas.UpdateQuotasChangeHandlerKafka;
import io.streamthoughts.jikkou.kafka.converters.V1KafkaClientQuotaListConverter;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaClientQuota;
import io.streamthoughts.jikkou.kafka.models.V1KafkaClientQuotaChange;
import io.streamthoughts.jikkou.kafka.models.V1KafkaClientQuotaChangeList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaClientQuotaList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResources({@AcceptsResource(type = V1KafkaClientQuota.class), @AcceptsResource(type = V1KafkaClientQuotaList.class, converter = V1KafkaClientQuotaListConverter.class)})
@AcceptsReconciliationModes({ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.APPLY_ALL})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaQuotaController.class */
public final class AdminClientKafkaQuotaController implements BaseResourceController<V1KafkaClientQuota, QuotaChange> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaQuotaController.class);
    public static final String LIMITS_DELETE_ORPHANS_CONFIG_NAME = "limits-delete-orphans";
    private AdminClientContextFactory adminClientContextFactory;

    public AdminClientKafkaQuotaController() {
    }

    public AdminClientKafkaQuotaController(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(configuration);
        }
    }

    public V1KafkaClientQuotaChangeList computeReconciliationChanges(@NotNull Collection<V1KafkaClientQuota> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        Stream<V1KafkaClientQuota> stream = collection.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        List<V1KafkaClientQuota> list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        Stream stream2 = new AdminClientKafkaQuotaCollector(this.adminClientContextFactory).listAll().stream();
        AggregateSelector aggregateSelector2 = new AggregateSelector(reconciliationContext.selectors());
        return V1KafkaClientQuotaChangeList.builder().withItems((List) new QuotaChangeComputer(isLimitDeletionEnabled(reconciliationMode, reconciliationContext)).computeChanges(stream2.filter((v1) -> {
            return r1.apply(v1);
        }).toList(), list).stream().map(hasMetadataChange -> {
            return V1KafkaClientQuotaChange.builder().withChange((QuotaChange) hasMetadataChange.getChange()).build();
        }).collect(Collectors.toList())).build();
    }

    public List<ChangeResult<QuotaChange>> execute(@NotNull List<HasMetadataChange<QuotaChange>> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            AdminClient adminClient = createAdminClientContext.getAdminClient();
            List<ChangeResult<QuotaChange>> execute = new ChangeExecutor(List.of(new CreateQuotasChangeHandlerKafka(adminClient), new UpdateQuotasChangeHandlerKafka(adminClient), new DeleteQuotasChangeHandler(adminClient), new ChangeHandler.None(QuotaChangeDescription::new))).execute(list, z);
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return execute;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    static boolean isLimitDeletionEnabled(@NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return ((Boolean) ConfigProperty.ofBoolean(LIMITS_DELETE_ORPHANS_CONFIG_NAME).orElse(() -> {
            return Boolean.valueOf(List.of(ReconciliationMode.APPLY_ALL, ReconciliationMode.DELETE, ReconciliationMode.UPDATE).contains(reconciliationMode));
        }).evaluate(reconciliationContext.configuration())).booleanValue();
    }

    /* renamed from: computeReconciliationChanges, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ResourceListObject m12computeReconciliationChanges(@NotNull Collection collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return computeReconciliationChanges((Collection<V1KafkaClientQuota>) collection, reconciliationMode, reconciliationContext);
    }
}
