package io.streamthoughts.jikkou.kafka.change.quota;

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/quota/KafkaClientQuotaChangeHandler.class */
public final class KafkaClientQuotaChangeHandler extends BaseChangeHandler<ResourceChange> {
    private final AdminClient client;

    public KafkaClientQuotaChangeHandler(@NotNull AdminClient adminClient) {
        this(adminClient, Set.of(Operation.CREATE, Operation.DELETE, Operation.UPDATE));
    }

    public KafkaClientQuotaChangeHandler(@NotNull AdminClient adminClient, @NotNull Set<Operation> set) {
        super(set);
        this.client = adminClient;
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    @NotNull
    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        Map map = (Map) this.client.alterClientQuotas((List) list.stream().map(resourceChange -> {
            return new ClientQuotaAlteration(getClientQuotaEntity(resourceChange), (List) resourceChange.getSpec2().getChanges().all().stream().map(stateChange -> {
                return new ClientQuotaAlteration.Op(stateChange.getName(), (Double) stateChange.getAfter());
            }).collect(Collectors.toList()));
        }).collect(Collectors.toList())).values().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Futures.toCompletableFuture((KafkaFuture) entry.getValue());
        }));
        Map map2 = (Map) list.stream().collect(Collectors.toMap(KafkaClientQuotaChangeHandler::getClientQuotaEntity, resourceChange2 -> {
            return resourceChange2;
        }));
        return map.entrySet().stream().map(entry2 -> {
            return new ChangeResponse((ResourceChange) map2.get(entry2.getKey()), (CompletableFuture<ChangeMetadata>) ((CompletableFuture) entry2.getValue()).thenApply(r2 -> {
                return ChangeMetadata.empty();
            }));
        }).toList();
    }

    @NotNull
    private static ClientQuotaEntity getClientQuotaEntity(ResourceChange resourceChange) {
        return new ClientQuotaEntity((Map) TypeConverter.ofMap().convertValue(resourceChange.getSpec2().getData()));
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return new KafkaClientQuotaChangeDescription(resourceChange);
    }
}
