package io.streamthoughts.jikkou.kafka.reconciler;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.SupportedResources;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.ChangeHandler;
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
import io.streamthoughts.jikkou.core.reconciler.Controller;
import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.kafka.ApiVersions;
import io.streamthoughts.jikkou.kafka.change.consumer.ConsumerGroupChangeComputer;
import io.streamthoughts.jikkou.kafka.change.consumer.ConsumerGroupChangeDescription;
import io.streamthoughts.jikkou.kafka.change.consumer.DeleteConsumerGroupHandler;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroup;
import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaConsumerGroupService;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

@ControllerConfiguration(supportedModes = {ReconciliationMode.DELETE})
@SupportedResources({@SupportedResource(type = V1KafkaConsumerGroup.class), @SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA1, kind = "KafkaConsumerGroupChange")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/AdminClientConsumerGroupController.class */
public final class AdminClientConsumerGroupController extends ContextualExtension implements Controller<V1KafkaConsumerGroup, ResourceChange> {
    private AdminClientContextFactory adminClientContextFactory;

    public AdminClientConsumerGroupController() {
    }

    public AdminClientConsumerGroupController(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(extensionContext.appConfiguration());
        }
    }

    public List<ChangeResult> execute(@NotNull ChangeExecutor<ResourceChange> changeExecutor, @NotNull ReconciliationContext reconciliationContext) {
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            List<ChangeResult> applyChanges = changeExecutor.applyChanges(List.of(new DeleteConsumerGroupHandler(createAdminClientContext.getAdminClient()), new ChangeHandler.None(ConsumerGroupChangeDescription::new)));
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return applyChanges;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<ResourceChange> plan(@NotNull Collection<V1KafkaConsumerGroup> collection, @NotNull ReconciliationContext reconciliationContext) {
        Stream<V1KafkaConsumerGroup> stream = collection.stream();
        Selector selector = reconciliationContext.selector();
        Objects.requireNonNull(selector);
        List list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).map(v1KafkaConsumerGroup -> {
            return v1KafkaConsumerGroup.withStatus(null);
        }).toList();
        List<String> list2 = list.stream().map(v1KafkaConsumerGroup2 -> {
            return v1KafkaConsumerGroup2.getMetadata().getName();
        }).distinct().toList();
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            Stream stream2 = new KafkaConsumerGroupService(createAdminClientContext.getAdminClient()).listConsumerGroups(list2, false).getItems().stream();
            Selector selector2 = reconciliationContext.selector();
            Objects.requireNonNull(selector2);
            List<ResourceChange> computeChanges = new ConsumerGroupChangeComputer().computeChanges(stream2.filter((v1) -> {
                return r1.apply(v1);
            }).map(v1KafkaConsumerGroup3 -> {
                return v1KafkaConsumerGroup3.withStatus(null);
            }).toList(), list);
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return computeChanges;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
