package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsReconciliationModes;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.change.ChangeExecutor;
import io.streamthoughts.jikkou.api.change.ChangeHandler;
import io.streamthoughts.jikkou.api.change.ChangeResult;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseResourceController;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.GenericResourceListObject;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.change.KafkaTableRecordChange;
import io.streamthoughts.jikkou.kafka.change.KafkaTableRecordChangeComputer;
import io.streamthoughts.jikkou.kafka.change.handlers.record.KafkaTableRecordChangeDescription;
import io.streamthoughts.jikkou.kafka.change.handlers.record.KafkaTableRecordChangeHandler;
import io.streamthoughts.jikkou.kafka.control.AdminClientKafkaTableCollector;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientFactory;
import io.streamthoughts.jikkou.kafka.internals.consumer.ConsumerFactory;
import io.streamthoughts.jikkou.kafka.internals.producer.DefaultProducerFactory;
import io.streamthoughts.jikkou.kafka.internals.producer.ProducerFactory;
import io.streamthoughts.jikkou.kafka.model.DataValue;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTableRecord;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResource(type = V1KafkaTableRecord.class)
@AcceptsReconciliationModes({ReconciliationMode.CREATE, ReconciliationMode.UPDATE, ReconciliationMode.APPLY_ALL})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTableController.class */
public final class AdminClientKafkaTableController implements BaseResourceController<V1KafkaTableRecord, KafkaTableRecordChange> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTableController.class);
    private ProducerFactory<byte[], byte[]> producerFactory;
    private ConsumerFactory<byte[], byte[]> consumerFactory;
    private AdminClientFactory adminClientFactory;
    private AdminClientKafkaTableCollector collector;

    public AdminClientKafkaTableController() {
    }

    public AdminClientKafkaTableController(ProducerFactory<byte[], byte[]> producerFactory, ConsumerFactory<byte[], byte[]> consumerFactory, AdminClientFactory adminClientFactory) {
        this.producerFactory = producerFactory;
        this.consumerFactory = consumerFactory;
        this.adminClientFactory = adminClientFactory;
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.producerFactory == null) {
            this.producerFactory = new DefaultProducerFactory((Supplier<Map<String, Object>>) () -> {
                return (Map) KafkaClientConfiguration.PRODUCER_CLIENT_CONFIG.evaluate(configuration);
            }, (Serializer) new ByteArraySerializer(), (Serializer) new ByteArraySerializer());
        }
        this.collector = new AdminClientKafkaTableCollector(this.consumerFactory, this.adminClientFactory);
        this.collector.configure(configuration);
    }

    public List<ChangeResult<KafkaTableRecordChange>> execute(@NotNull List<HasMetadataChange<KafkaTableRecordChange>> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        Producer<byte[], byte[]> createProducer = this.producerFactory.createProducer();
        try {
            List<ChangeResult<KafkaTableRecordChange>> execute = new ChangeExecutor(List.of(new KafkaTableRecordChangeHandler(createProducer), new ChangeHandler.None(KafkaTableRecordChangeDescription::new))).execute(list, z);
            if (createProducer != null) {
                createProducer.close();
            }
            return execute;
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ResourceListObject<? extends HasMetadataChange<KafkaTableRecordChange>> computeReconciliationChanges(@NotNull Collection<V1KafkaTableRecord> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        Stream<V1KafkaTableRecord> filter = collection.stream().filter(AdminClientKafkaTableController::recordWithNonEmptyKey);
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        Map map = (Map) filter.filter((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.groupingBy(v1KafkaTableRecord -> {
            return v1KafkaTableRecord.getMetadata().getName();
        }, Collectors.toList()));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : map.entrySet()) {
            String str = (String) entry.getKey();
            V1KafkaTableRecord v1KafkaTableRecord2 = (V1KafkaTableRecord) ((List) entry.getValue()).get(0);
            arrayList.addAll(new KafkaTableRecordChangeComputer().computeChanges(this.collector.listAll(Configuration.from(Map.of(AdminClientKafkaTableCollector.Config.TOPIC_NAME_CONFIG.key(), str, AdminClientKafkaTableCollector.Config.KEY_TYPE_CONFIG.key(), v1KafkaTableRecord2.m58getSpec().getKey().type().name(), AdminClientKafkaTableCollector.Config.VALUE_TYPE_CONFIG.key(), v1KafkaTableRecord2.m58getSpec().getValue().type().name(), AdminClientKafkaTableCollector.Config.SKIP_MESSAGE_ON_ERROR_CONFIG.key(), true)), reconciliationContext.selectors()), (Iterable) entry.getValue()));
        }
        return new GenericResourceListObject(arrayList);
    }

    private static boolean recordWithNonEmptyKey(@NotNull V1KafkaTableRecord v1KafkaTableRecord) {
        DataValue key = v1KafkaTableRecord.m58getSpec().getKey();
        return (key == null || key.data().isNull()) ? false : true;
    }
}
