package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.annotations.AcceptsReconciliationModes;
import io.streamthoughts.jikkou.api.annotations.AcceptsResource;
import io.streamthoughts.jikkou.api.annotations.AcceptsResources;
import io.streamthoughts.jikkou.api.config.ConfigProperty;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseResourceController;
import io.streamthoughts.jikkou.api.control.ChangeExecutor;
import io.streamthoughts.jikkou.api.control.ChangeHandler;
import io.streamthoughts.jikkou.api.control.ChangeResult;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.AdminClientContext;
import io.streamthoughts.jikkou.kafka.control.change.TopicChange;
import io.streamthoughts.jikkou.kafka.control.change.TopicChangeComputer;
import io.streamthoughts.jikkou.kafka.control.handlers.topics.AlterTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.control.handlers.topics.CreateTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.control.handlers.topics.DeleteTopicChangeHandler;
import io.streamthoughts.jikkou.kafka.control.handlers.topics.TopicChangeDescription;
import io.streamthoughts.jikkou.kafka.converters.V1KafkaTopicListConverter;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicChange;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicChangeList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResources({@AcceptsResource(type = V1KafkaTopic.class), @AcceptsResource(type = V1KafkaTopicList.class, converter = V1KafkaTopicListConverter.class)})
@AcceptsReconciliationModes({ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.APPLY_ALL})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTopicController.class */
public final class AdminClientKafkaTopicController extends AbstractAdminClientKafkaController implements BaseResourceController<V1KafkaTopic, TopicChange> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTopicController.class);
    public static final String CONFIG_ENTRY_DELETE_ORPHANS_CONFIG_NAME = "config-delete-orphans";
    private AdminClientKafkaTopicCollector collector;

    public AdminClientKafkaTopicController() {
    }

    public AdminClientKafkaTopicController(@NotNull Configuration configuration) {
        configure(configuration);
    }

    public AdminClientKafkaTopicController(@NotNull AdminClientContext adminClientContext) {
        super(adminClientContext);
        setInternalDescriptor(adminClientContext);
    }

    @Override // io.streamthoughts.jikkou.kafka.control.AbstractAdminClientKafkaController
    public void configure(@NotNull Configuration configuration) throws ConfigException {
        super.configure(configuration);
        setInternalDescriptor(this.adminClientContext);
    }

    private void setInternalDescriptor(@NotNull AdminClientContext adminClientContext) {
        if (this.collector == null) {
            this.collector = new AdminClientKafkaTopicCollector(adminClientContext);
        }
    }

    public List<ChangeResult<TopicChange>> execute(@NotNull List<TopicChange> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        AdminClient client = this.adminClientContext.client();
        return new ChangeExecutor(List.of(new CreateTopicChangeHandler(client), new AlterTopicChangeHandler(client), new DeleteTopicChangeHandler(client), new ChangeHandler.None(TopicChangeDescription::new))).execute(list, z);
    }

    public V1KafkaTopicChangeList computeReconciliationChanges(@NotNull Collection<V1KafkaTopic> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        LOG.info("Computing reconciliation change for '{}' resources in '{}' mode", Integer.valueOf(collection.size()), reconciliationMode);
        Stream<V1KafkaTopic> stream = collection.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        List<V1KafkaTopic> list = stream.filter((v1) -> {
            return r1.apply(v1);
        }).toList();
        return new V1KafkaTopicChangeList().withItems(new TopicChangeComputer(isConfigDeletionEnabled(reconciliationMode, reconciliationContext)).computeChanges(this.collector.listAll(new ConfigDescribeConfiguration().withDescribeDefaultConfigs(true).withDescribeStaticBrokerConfigs(true).withDescribeDynamicBrokerConfigs(true).asConfiguration(), reconciliationContext.selectors()), list).stream().map(topicChange -> {
            return V1KafkaTopicChange.builder().withChange(topicChange).build();
        }).toList());
    }

    @VisibleForTesting
    static boolean isConfigDeletionEnabled(@NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return ((Boolean) ConfigProperty.ofBoolean(CONFIG_ENTRY_DELETE_ORPHANS_CONFIG_NAME).orElse(() -> {
            return Boolean.valueOf(List.of(ReconciliationMode.APPLY_ALL, ReconciliationMode.DELETE, ReconciliationMode.UPDATE).contains(reconciliationMode));
        }).evaluate(reconciliationContext.configuration())).booleanValue();
    }

    /* renamed from: computeReconciliationChanges, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ResourceListObject m10computeReconciliationChanges(@NotNull Collection collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        return computeReconciliationChanges((Collection<V1KafkaTopic>) collection, reconciliationMode, reconciliationContext);
    }
}
