package io.streamthoughts.jikkou.extension.aiven.reconciler;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.SupportedResources;
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.exceptions.ConfigException;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
import io.streamthoughts.jikkou.core.reconciler.Controller;
import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.core.selector.Selectors;
import io.streamthoughts.jikkou.extension.aiven.ApiVersions;
import io.streamthoughts.jikkou.extension.aiven.api.AivenApiClient;
import io.streamthoughts.jikkou.extension.aiven.api.AivenApiClientConfig;
import io.streamthoughts.jikkou.extension.aiven.api.AivenApiClientFactory;
import io.streamthoughts.jikkou.extension.aiven.change.acl.KafkaAclEntryChangeComputer;
import io.streamthoughts.jikkou.extension.aiven.change.acl.KafkaAclEntryChangeHandler;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaTopicAclEntry;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

@ControllerConfiguration(supportedModes = {ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.FULL})
@SupportedResources({@SupportedResource(type = V1KafkaTopicAclEntry.class), @SupportedResource(apiVersion = ApiVersions.KAFKA_AIVEN_V1BETA1, kind = "KafkaTopicAclEntryChange")})
/* loaded from: input_file:io/streamthoughts/jikkou/extension/aiven/reconciler/AivenKafkaTopicAclEntryController.class */
public class AivenKafkaTopicAclEntryController implements Controller<V1KafkaTopicAclEntry, ResourceChange> {
    public static final ConfigProperty<Boolean> DELETE_ORPHANS_OPTIONS = ConfigProperty.ofBoolean("delete-orphans").orElse((ConfigProperty<Boolean>) false);
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private AivenApiClientConfig config;
    private AivenKafkaTopicAclEntryCollector collector;

    public AivenKafkaTopicAclEntryController() {
    }

    public AivenKafkaTopicAclEntryController(@NotNull AivenApiClientConfig aivenApiClientConfig) {
        init(aivenApiClientConfig);
    }

    @Override // io.streamthoughts.jikkou.core.extension.Extension
    public void init(@NotNull ExtensionContext extensionContext) {
        init(new AivenApiClientConfig(extensionContext.appConfiguration()));
    }

    private void init(@NotNull AivenApiClientConfig aivenApiClientConfig) throws ConfigException {
        if (this.initialized.compareAndSet(false, true)) {
            this.config = aivenApiClientConfig;
            this.collector = new AivenKafkaTopicAclEntryCollector(aivenApiClientConfig);
        }
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ChangeResult> execute(@NotNull ChangeExecutor<ResourceChange> changeExecutor, @NotNull ReconciliationContext reconciliationContext) {
        AivenApiClient create = AivenApiClientFactory.create(this.config);
        try {
            List<ChangeResult> applyChanges = changeExecutor.applyChanges(List.of(new KafkaAclEntryChangeHandler.Create(create), new KafkaAclEntryChangeHandler.Delete(create), new KafkaAclEntryChangeHandler.None()));
            create.close();
            return applyChanges;
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ResourceChange> plan(@NotNull Collection<V1KafkaTopicAclEntry> collection, @NotNull ReconciliationContext reconciliationContext) {
        Stream<E> stream = this.collector.listAll(reconciliationContext.configuration(), Selectors.NO_SELECTOR).stream();
        Selector selector = reconciliationContext.selector();
        Objects.requireNonNull(selector);
        List list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        Stream<V1KafkaTopicAclEntry> stream2 = collection.stream();
        Selector selector2 = reconciliationContext.selector();
        Objects.requireNonNull(selector2);
        return new KafkaAclEntryChangeComputer(DELETE_ORPHANS_OPTIONS.get(reconciliationContext.configuration()).booleanValue()).computeChanges(list, stream2.filter((v1) -> {
            return r1.apply(v1);
        }).toList());
    }
}
