package io.streamthoughts.jikkou.kafka.connect.reconciler;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.SupportedResources;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.models.HasMetadata;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.ChangeHandler;
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
import io.streamthoughts.jikkou.core.reconciler.Controller;
import io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor;
import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectExtensionConfig;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectLabels;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

@ControllerConfiguration(supportedModes = {ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.FULL})
@SupportedResources({@SupportedResource(type = V1KafkaConnector.class), @SupportedResource(apiVersion = "kafka.jikkou.io/v1beta1", kind = "KafkaConnectorChange")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.class */
public final class KafkaConnectorController extends ContextualExtension implements Controller<V1KafkaConnector, ResourceChange> {
    private KafkaConnectExtensionConfig configuration;
    private KafkaConnectorCollector collector;

    @Override // io.streamthoughts.jikkou.core.extension.ContextualExtension, io.streamthoughts.jikkou.core.extension.Extension
    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        this.configuration = new KafkaConnectExtensionConfig(extensionContext.appConfiguration());
        this.collector = new KafkaConnectorCollector();
        this.collector.init(extensionContext);
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ChangeResult> execute(@NotNull ChangeExecutor<ResourceChange> changeExecutor, @NotNull ReconciliationContext reconciliationContext) {
        Map groupByKafkaConnectCluster = groupByKafkaConnectCluster(changeExecutor.changes(), resourceChange -> {
            return true;
        });
        LinkedList linkedList = new LinkedList();
        for (Map.Entry entry : groupByKafkaConnectCluster.entrySet()) {
            String str = (String) entry.getKey();
            KafkaConnectApi create = KafkaConnectApiFactory.create(this.configuration.resolveClientConfigForCluster(str, (List) entry.getValue()));
            try {
                linkedList.addAll(new DefaultChangeExecutor(reconciliationContext, (List) entry.getValue()).applyChanges(List.of(new KafkaConnectorChangeHandler(create, str), new ChangeHandler.None(resourceChange2 -> {
                    return new KafkaConnectorChangeDescription(str, resourceChange2);
                }))));
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return linkedList;
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.Controller
    public List<ResourceChange> plan(@NotNull Collection<V1KafkaConnector> collection, @NotNull ReconciliationContext reconciliationContext) {
        Selector selector = reconciliationContext.selector();
        Objects.requireNonNull(selector);
        Map groupByKafkaConnectCluster = groupByKafkaConnectCluster(collection, (v1) -> {
            return r2.apply(v1);
        });
        KafkaConnectorChangeComputer kafkaConnectorChangeComputer = new KafkaConnectorChangeComputer();
        LinkedList linkedList = new LinkedList();
        for (Map.Entry entry : groupByKafkaConnectCluster.entrySet()) {
            Stream<V1KafkaConnector> stream = this.collector.listAll((String) entry.getKey(), this.configuration.resolveClientConfigForCluster((String) entry.getKey(), (List) entry.getValue()), false).stream();
            Selector selector2 = reconciliationContext.selector();
            Objects.requireNonNull(selector2);
            linkedList.addAll(kafkaConnectorChangeComputer.computeChanges(stream.filter((v1) -> {
                return r1.apply(v1);
            }).toList(), (Iterable) entry.getValue()));
        }
        return linkedList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @NotNull
    private <T extends HasMetadata> Map<String, List<T>> groupByKafkaConnectCluster(@NotNull Collection<T> collection, @NotNull Predicate<T> predicate) {
        return (Map) collection.stream().filter(predicate).collect(Collectors.groupingBy(hasMetadata -> {
            return hasMetadata.getMetadata().getLabelByKey(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER).getValue().toString();
        }, Collectors.toList()));
    }
}
