package io.streamthoughts.jikkou.kafka.change.handlers.topics;

import io.streamthoughts.jikkou.common.utils.CollectionUtils;
import io.streamthoughts.jikkou.core.models.HasMetadataChange;
import io.streamthoughts.jikkou.core.reconcilier.ChangeHandler;
import io.streamthoughts.jikkou.core.reconcilier.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconcilier.ChangeResponse;
import io.streamthoughts.jikkou.core.reconcilier.ChangeType;
import io.streamthoughts.jikkou.core.reconcilier.change.ConfigEntryChange;
import io.streamthoughts.jikkou.kafka.change.TopicChange;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/handlers/topics/UpdateTopicChangeHandler.class */
public final class UpdateTopicChangeHandler implements KafkaTopicChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(UpdateTopicChangeHandler.class);
    private final AdminClient client;

    public UpdateTopicChangeHandler(@NotNull AdminClient adminClient) {
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "'client' cannot be null");
    }

    public Set<ChangeType> supportedChangeTypes() {
        return Set.of(ChangeType.UPDATE);
    }

    @NotNull
    public List<ChangeResponse<TopicChange>> apply(@NotNull List<HasMetadataChange<TopicChange>> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (HasMetadataChange<TopicChange> hasMetadataChange : list) {
            ChangeHandler.verify(this, hasMetadataChange);
            TopicChange topicChange = (TopicChange) hasMetadataChange.getChange();
            hashMap3.put(topicChange.getName(), new ArrayList());
            if (topicChange.hasConfigEntryChanges()) {
                ArrayList arrayList = new ArrayList(topicChange.getConfigEntryChanges().size());
                for (ConfigEntryChange configEntryChange : topicChange.getConfigEntryChanges()) {
                    ChangeType operation = configEntryChange.operation();
                    if (operation == ChangeType.DELETE) {
                        arrayList.add(newAlterConfigOp(configEntryChange, null, AlterConfigOp.OpType.DELETE));
                    }
                    if (operation == ChangeType.UPDATE || operation == ChangeType.ADD) {
                        arrayList.add(newAlterConfigOp(configEntryChange, String.valueOf(configEntryChange.valueChange().getAfter()), AlterConfigOp.OpType.SET));
                    }
                }
                hashMap.put(new ConfigResource(ConfigResource.Type.TOPIC, topicChange.getName()), arrayList);
            }
            Optional.ofNullable(topicChange.getPartitions()).flatMap((v0) -> {
                return v0.toOptional();
            }).ifPresent(num -> {
                hashMap2.put(topicChange.getName(), NewPartitions.increaseTo(num.intValue()));
            });
        }
        if (!hashMap.isEmpty()) {
            this.client.incrementalAlterConfigs(hashMap).values().forEach((configResource, kafkaFuture) -> {
                CompletableFuture<Void> completableFuture = Futures.toCompletableFuture(kafkaFuture);
                if (LOG.isDebugEnabled()) {
                    completableFuture = completableFuture.thenAccept(r5 -> {
                        LOG.debug("Completed config changes for topic: {}", configResource.name());
                    });
                }
                ((List) hashMap3.get(configResource.name())).add(completableFuture);
            });
        }
        if (!hashMap2.isEmpty()) {
            this.client.createPartitions(hashMap2).values().forEach((str, kafkaFuture2) -> {
                CompletableFuture<Void> completableFuture = Futures.toCompletableFuture(kafkaFuture2);
                if (LOG.isDebugEnabled()) {
                    completableFuture = completableFuture.thenAccept(r5 -> {
                        LOG.debug("Completed partitions creation for topic: {}", str);
                    });
                }
                ((List) hashMap3.get(str)).add(completableFuture);
            });
        }
        Map keyBy = CollectionUtils.keyBy(list, hasMetadataChange2 -> {
            return ((TopicChange) hasMetadataChange2.getChange()).getName();
        });
        return hashMap3.entrySet().stream().map(entry -> {
            return new ChangeResponse((HasMetadataChange) keyBy.get(entry.getKey()), ((List) entry.getValue()).stream().map(completableFuture -> {
                return completableFuture.thenApply(r2 -> {
                    return ChangeMetadata.empty();
                });
            }).toList());
        }).toList();
    }

    @NotNull
    private AlterConfigOp newAlterConfigOp(ConfigEntryChange configEntryChange, String str, AlterConfigOp.OpType opType) {
        return new AlterConfigOp(new ConfigEntry(configEntryChange.name(), str), opType);
    }
}
