package io.streamthoughts.jikkou.kafka.control.operation.acls;

import io.streamthoughts.jikkou.api.control.ChangeDescription;
import io.streamthoughts.jikkou.api.control.ChangeType;
import io.streamthoughts.jikkou.kafka.control.change.AclChange;
import io.streamthoughts.jikkou.kafka.model.AccessControlPolicy;
import io.vavr.Tuple2;
import io.vavr.concurrent.Future;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.ResourcePattern;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/operation/acls/CreateAclsOperation.class */
public class CreateAclsOperation implements AclOperation {
    private final AclBindingConverter converter = new AclBindingConverter();
    private final AdminClient adminClient;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/operation/acls/CreateAclsOperation$AclBindingConverter.class */
    private static class AclBindingConverter {
        private AclBindingConverter() {
        }

        AclBinding toAclBinding(AccessControlPolicy accessControlPolicy) {
            return new AclBinding(new ResourcePattern(accessControlPolicy.resourceType(), accessControlPolicy.resourcePattern(), accessControlPolicy.patternType()), new AccessControlEntry(accessControlPolicy.principal(), accessControlPolicy.host(), accessControlPolicy.operation(), accessControlPolicy.permission()));
        }

        AccessControlPolicy fromAclBinding(AclBinding aclBinding) {
            ResourcePattern pattern = aclBinding.pattern();
            return AccessControlPolicy.builder().withResourcePattern(pattern.name()).withPatternType(pattern.patternType()).withResourceType(pattern.resourceType()).withOperation(aclBinding.entry().operation()).withPermission(aclBinding.entry().permissionType()).withHost(aclBinding.entry().host()).withPrincipal(aclBinding.entry().principal()).build();
        }
    }

    public CreateAclsOperation(@NotNull AdminClient adminClient) {
        this.adminClient = (AdminClient) Objects.requireNonNull(adminClient, "'adminClient should not be null'");
    }

    @Override // io.streamthoughts.jikkou.kafka.control.operation.acls.AclOperation
    public ChangeDescription getDescriptionFor(@NotNull AclChange aclChange) {
        return new AclChangeDescription(aclChange);
    }

    @Override // io.streamthoughts.jikkou.kafka.control.operation.acls.AclOperation
    public boolean test(AclChange aclChange) {
        return aclChange.getChange() == ChangeType.ADD;
    }

    @Override // io.streamthoughts.jikkou.kafka.control.operation.acls.AclOperation
    @NotNull
    public Map<AccessControlPolicy, List<Future<Void>>> doApply(@NotNull Collection<AclChange> collection) {
        Stream<R> map = collection.stream().peek(this::verify).map((v0) -> {
            return v0.getAccessControlPolicy();
        });
        AclBindingConverter aclBindingConverter = this.converter;
        Objects.requireNonNull(aclBindingConverter);
        return (Map) this.adminClient.createAcls((List) map.map(aclBindingConverter::toAclBinding).collect(Collectors.toList())).values().entrySet().stream().map(entry -> {
            return new Tuple2(this.converter.fromAclBinding((AclBinding) entry.getKey()), (KafkaFuture) entry.getValue());
        }).map(tuple2 -> {
            return tuple2.map2((v0) -> {
                return Future.fromJavaFuture(v0);
            });
        }).map(tuple22 -> {
            return tuple22.map2((v0) -> {
                return List.of(v0);
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0._1();
        }, (v0) -> {
            return v0._2();
        }));
    }

    private void verify(@NotNull AclChange aclChange) {
        if (!test(aclChange)) {
            throw new IllegalArgumentException("This operation does not support the passed change: " + aclChange);
        }
    }
}
