package io.streamthoughts.jikkou.kafka.change.handlers.quotas;

import io.streamthoughts.jikkou.api.change.ChangeDescription;
import io.streamthoughts.jikkou.api.change.ChangeMetadata;
import io.streamthoughts.jikkou.api.change.ChangeResponse;
import io.streamthoughts.jikkou.api.change.ChangeType;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.kafka.change.QuotaChange;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/handlers/quotas/AbstractQuotaChangeHandler.class */
public abstract class AbstractQuotaChangeHandler implements KafkaQuotaChangeHandler {
    private final AdminClient client;
    protected final Set<ChangeType> supportedChangeTypes;

    public AbstractQuotaChangeHandler(@NotNull AdminClient adminClient, @NotNull ChangeType changeType) {
        this(adminClient, (Set<ChangeType>) Set.of(changeType));
    }

    public AbstractQuotaChangeHandler(@NotNull AdminClient adminClient, @NotNull Set<ChangeType> set) {
        this.client = adminClient;
        this.supportedChangeTypes = set;
    }

    @Override // io.streamthoughts.jikkou.api.change.ChangeHandler
    @NotNull
    public List<ChangeResponse<QuotaChange>> apply(@NotNull List<HasMetadataChange<QuotaChange>> list) {
        Map map = (Map) this.client.alterClientQuotas((List) list.stream().map(hasMetadataChange -> {
            QuotaChange quotaChange = (QuotaChange) hasMetadataChange.getChange();
            return new ClientQuotaAlteration(new ClientQuotaEntity(quotaChange.getType().toEntities(quotaChange.getEntity())), (List) quotaChange.getConfigEntryChanges().stream().map(configEntryChange -> {
                return new ClientQuotaAlteration.Op(configEntryChange.getName(), (Double) configEntryChange.getValueChange().getAfter());
            }).collect(Collectors.toList()));
        }).collect(Collectors.toList())).values().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Futures.toCompletableFuture((KafkaFuture) entry.getValue());
        }));
        Map map2 = (Map) list.stream().collect(Collectors.toMap(hasMetadataChange2 -> {
            return new ClientQuotaEntity(((QuotaChange) hasMetadataChange2.getChange()).getType().toEntities(((QuotaChange) hasMetadataChange2.getChange()).getEntity()));
        }, hasMetadataChange3 -> {
            return hasMetadataChange3;
        }));
        return map.entrySet().stream().map(entry2 -> {
            return new ChangeResponse((HasMetadataChange) map2.get(entry2.getKey()), (CompletableFuture<ChangeMetadata>) ((CompletableFuture) entry2.getValue()).thenApply(r2 -> {
                return ChangeMetadata.empty();
            }));
        }).toList();
    }

    @Override // io.streamthoughts.jikkou.api.change.ChangeHandler
    public Set<ChangeType> supportedChangeTypes() {
        return this.supportedChangeTypes;
    }

    @Override // io.streamthoughts.jikkou.api.change.ChangeHandler
    public ChangeDescription getDescriptionFor(@NotNull HasMetadataChange<QuotaChange> hasMetadataChange) {
        return new QuotaChangeDescription(hasMetadataChange);
    }
}
