package io.streamthoughts.jikkou.kafka.change.acl;

import io.streamthoughts.jikkou.common.utils.Pair;
import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.model.KafkaAclBinding;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/acl/CreateAclChangeHandler.class */
public final class CreateAclChangeHandler extends BaseChangeHandler<ResourceChange> {
    private final AdminClient client;

    public CreateAclChangeHandler(@NotNull AdminClient adminClient) {
        super(Operation.CREATE);
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "client cannot not be null");
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        return list.stream().map(resourceChange -> {
            return new ChangeResponse(resourceChange, (List<CompletableFuture<ChangeMetadata>>) this.client.createAcls(resourceChange.getSpec2().getChanges().get(AclChangeComputer.ACL).all(TypeConverter.of(KafkaAclBinding.class)).stream().map((v0) -> {
                return v0.getAfter();
            }).map((v0) -> {
                return v0.toAclBinding();
            }).toList()).values().entrySet().stream().map(entry -> {
                return Pair.of((AclBinding) entry.getKey(), (KafkaFuture) entry.getValue());
            }).map(pair -> {
                return ((KafkaFuture) pair._2()).toCompletionStage().toCompletableFuture().thenApply(r2 -> {
                    return ChangeMetadata.empty();
                });
            }).toList());
        }).toList();
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return new KafkaPrincipalAuthorizationDescription(resourceChange);
    }
}
