package io.streamthoughts.jikkou.kafka.change.topics;

import io.streamthoughts.jikkou.common.utils.CollectionUtils;
import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.StateChange;
import io.streamthoughts.jikkou.core.models.change.StateChangeList;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/topics/UpdateTopicChangeHandler.class */
public final class UpdateTopicChangeHandler extends BaseChangeHandler<ResourceChange> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) UpdateTopicChangeHandler.class);
    private final AdminClient client;

    public UpdateTopicChangeHandler(@NotNull AdminClient adminClient) {
        super(Operation.UPDATE);
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "'client' cannot be null");
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return TopicChange.getDescription(resourceChange);
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    @NotNull
    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (ResourceChange resourceChange : list) {
            String name = resourceChange.getMetadata().getName();
            hashMap3.put(name, new ArrayList());
            StateChangeList<? extends StateChange> changes = resourceChange.getSpec2().getChanges();
            StateChangeList<StateChange> allWithPrefix = changes.allWithPrefix("config.");
            if (!allWithPrefix.isEmpty()) {
                ArrayList arrayList = new ArrayList(allWithPrefix.size());
                for (StateChange stateChange : allWithPrefix) {
                    Operation op = stateChange.getOp();
                    if (op == Operation.DELETE) {
                        arrayList.add(newAlterConfigOp(stateChange, null, AlterConfigOp.OpType.DELETE));
                    }
                    if (op == Operation.UPDATE || op == Operation.CREATE) {
                        arrayList.add(newAlterConfigOp(stateChange, String.valueOf(stateChange.getAfter()), AlterConfigOp.OpType.SET));
                    }
                }
                hashMap.put(new ConfigResource(ConfigResource.Type.TOPIC, name), arrayList);
            }
            changes.findLast(TopicChange.PARTITIONS, TypeConverter.Integer()).stream().filter(specificStateChange -> {
                return specificStateChange.getOp() != Operation.NONE;
            }).map((v0) -> {
                return v0.getAfter();
            }).findFirst().ifPresent(num -> {
                hashMap2.put(name, NewPartitions.increaseTo(num.intValue()));
            });
        }
        if (!hashMap.isEmpty()) {
            this.client.incrementalAlterConfigs(hashMap).values().forEach((configResource, kafkaFuture) -> {
                CompletableFuture<Void> completableFuture = Futures.toCompletableFuture(kafkaFuture);
                if (LOG.isDebugEnabled()) {
                    completableFuture = completableFuture.thenAccept(r5 -> {
                        LOG.debug("Completed config changes for topic: {}", configResource.name());
                    });
                }
                ((List) hashMap3.get(configResource.name())).add(completableFuture);
            });
        }
        if (!hashMap2.isEmpty()) {
            this.client.createPartitions(hashMap2).values().forEach((str, kafkaFuture2) -> {
                CompletableFuture<Void> completableFuture = Futures.toCompletableFuture(kafkaFuture2);
                if (LOG.isDebugEnabled()) {
                    completableFuture = completableFuture.thenAccept(r5 -> {
                        LOG.debug("Completed partitions creation for topic: {}", str);
                    });
                }
                ((List) hashMap3.get(str)).add(completableFuture);
            });
        }
        Map keyBy = CollectionUtils.keyBy(list, resourceChange2 -> {
            return resourceChange2.getMetadata().getName();
        });
        return hashMap3.entrySet().stream().map(entry -> {
            return new ChangeResponse((ResourceChange) keyBy.get(entry.getKey()), (List<CompletableFuture<ChangeMetadata>>) ((List) entry.getValue()).stream().map(completableFuture -> {
                return completableFuture.thenApply(r2 -> {
                    return ChangeMetadata.empty();
                });
            }).toList());
        }).toList();
    }

    @NotNull
    private AlterConfigOp newAlterConfigOp(StateChange stateChange, String str, AlterConfigOp.OpType opType) {
        return new AlterConfigOp(new ConfigEntry(stateChange.getName(), str), opType);
    }
}
