package io.streamthoughts.jikkou.kafka.control.operation.topics;

import io.streamthoughts.jikkou.api.control.ChangeDescription;
import io.streamthoughts.jikkou.api.control.ChangeType;
import io.streamthoughts.jikkou.api.control.ConfigEntryChange;
import io.streamthoughts.jikkou.kafka.control.change.TopicChange;
import io.vavr.concurrent.Future;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/operation/topics/AlterTopicOperation.class */
public final class AlterTopicOperation implements TopicOperation {
    private final AdminClient client;

    public AlterTopicOperation(@NotNull AdminClient adminClient) {
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "'client' cannot be null");
    }

    @Override // io.streamthoughts.jikkou.kafka.control.operation.topics.TopicOperation
    public ChangeDescription getDescriptionFor(@NotNull TopicChange topicChange) {
        return new TopicChangeDescription(topicChange);
    }

    @Override // io.streamthoughts.jikkou.kafka.control.operation.topics.TopicOperation
    public boolean test(TopicChange topicChange) {
        return topicChange.getChange() == ChangeType.UPDATE;
    }

    @NotNull
    public Map<String, List<Future<Void>>> doApply(@NotNull Collection<TopicChange> collection) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (TopicChange topicChange : collection) {
            verify(topicChange);
            hashMap3.put(topicChange.getName(), new ArrayList());
            if (topicChange.hasConfigEntryChanges()) {
                ArrayList arrayList = new ArrayList(topicChange.getConfigEntryChanges().size());
                for (ConfigEntryChange configEntryChange : topicChange.getConfigEntryChanges()) {
                    ChangeType change = configEntryChange.getChange();
                    if (change == ChangeType.DELETE) {
                        arrayList.add(newAlterConfigOp(configEntryChange, null, AlterConfigOp.OpType.DELETE));
                    }
                    if (change == ChangeType.UPDATE) {
                        arrayList.add(newAlterConfigOp(configEntryChange, String.valueOf(configEntryChange.getValueChange().getAfter()), AlterConfigOp.OpType.SET));
                    }
                }
                hashMap.put(new ConfigResource(ConfigResource.Type.TOPIC, topicChange.getName()), arrayList);
            }
            Optional.ofNullable(topicChange.getPartitions()).flatMap((v0) -> {
                return v0.toOptional();
            }).ifPresent(num -> {
                hashMap2.put(topicChange.getName(), NewPartitions.increaseTo(num.intValue()));
            });
        }
        if (!hashMap.isEmpty()) {
            this.client.incrementalAlterConfigs(hashMap).values().forEach((configResource, kafkaFuture) -> {
                ((List) hashMap3.get(configResource.name())).add(Future.fromJavaFuture(kafkaFuture));
            });
        }
        if (!hashMap2.isEmpty()) {
            this.client.createPartitions(hashMap2).values().forEach((str, kafkaFuture2) -> {
                ((List) hashMap3.get(str)).add(Future.fromJavaFuture(kafkaFuture2));
            });
        }
        return hashMap3;
    }

    @NotNull
    private AlterConfigOp newAlterConfigOp(ConfigEntryChange configEntryChange, String str, AlterConfigOp.OpType opType) {
        return new AlterConfigOp(new ConfigEntry(configEntryChange.getName(), str), opType);
    }

    private void verify(@NotNull TopicChange topicChange) {
        if (!test(topicChange)) {
            throw new IllegalArgumentException("This operation does not support the passed change: " + topicChange);
        }
    }
}
