package org.creekservice.internal.kafka.extension.client;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.topic.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.topic.KafkaTopicDescriptor;
import org.creekservice.api.observability.logging.structured.LogEntryCustomizer;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;

/* loaded from: input_file:org/creekservice/internal/kafka/extension/client/KafkaTopicClient.class */
public final class KafkaTopicClient implements TopicClient {
    private final StructuredLogger logger;
    private final String clusterName;
    private final Map<String, Object> kafkaProperties;
    private final Function<Map<String, Object>, Admin> adminFactory;

    /* loaded from: input_file:org/creekservice/internal/kafka/extension/client/KafkaTopicClient$CreateTopicException.class */
    private static final class CreateTopicException extends RuntimeException {
        CreateTopicException(URI uri, Throwable th) {
            super("Failed to create topic. " + String.valueOf(LoggingField.topicId) + ": " + String.valueOf(uri), th);
        }
    }

    public KafkaTopicClient(String str, Map<String, Object> map) {
        this(str, map, Admin::create, StructuredLoggerFactory.internalLogger(KafkaTopicClient.class));
    }

    KafkaTopicClient(String str, Map<String, Object> map, Function<Map<String, Object>, Admin> function, StructuredLogger structuredLogger) {
        this.clusterName = (String) Objects.requireNonNull(str, "clusterName");
        this.kafkaProperties = (Map) Objects.requireNonNull(map, "kafkaProperties");
        this.adminFactory = (Function) Objects.requireNonNull(function, "adminFactory");
        this.logger = (StructuredLogger) Objects.requireNonNull(structuredLogger, "logger");
    }

    @Override // org.creekservice.api.kafka.extension.client.TopicClient
    public void ensureTopicsExist(List<? extends CreatableKafkaTopic<?, ?>> list) {
        validateCluster(list);
        Admin apply = this.adminFactory.apply(this.kafkaProperties);
        try {
            create(list, apply);
            if (apply != null) {
                apply.close();
            }
        } catch (Throwable th) {
            if (apply != null) {
                try {
                    apply.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void validateCluster(List<? extends CreatableKafkaTopic<?, ?>> list) {
        List list2 = (List) list.stream().filter(creatableKafkaTopic -> {
            return !creatableKafkaTopic.cluster().equals(this.clusterName);
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        if (!list2.isEmpty()) {
            throw new IllegalArgumentException("topics were for wrong cluster." + System.lineSeparator() + "Expected cluster: " + this.clusterName + System.lineSeparator() + "Invalid topic ids: " + String.valueOf(list2));
        }
    }

    private void create(List<? extends CreatableKafkaTopic<?, ?>> list, Admin admin) {
        CreateTopicsResult createTopics = admin.createTopics((List) list.stream().map(KafkaTopicClient::toNewTopic).collect(Collectors.toList()));
        createTopics.values().entrySet().forEach(entry -> {
            String str = (String) entry.getKey();
            URI resourceId = KafkaTopicDescriptor.resourceId(this.clusterName, str);
            try {
                ((KafkaFuture) entry.getValue()).get();
                Integer num = (Integer) createTopics.numPartitions(str).get();
                List list2 = (List) ((Config) createTopics.config(str).get()).entries().stream().filter(configEntry -> {
                    return configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
                }).collect(Collectors.toList());
                this.logger.info("Created topic", logEntryCustomizer -> {
                    LogEntryCustomizer ns = logEntryCustomizer.with(LoggingField.topicId, resourceId).with(LoggingField.partitions, num).ns("config");
                    list2.forEach(configEntry2 -> {
                        ns.with(configEntry2.name(), configEntry2.value());
                    });
                });
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw new CreateTopicException(resourceId, e.getCause());
                }
                this.logger.debug("Topic already exists", logEntryCustomizer2 -> {
                    logEntryCustomizer2.with(LoggingField.topicId, resourceId);
                });
            } catch (Exception e2) {
                throw new CreateTopicException(resourceId, e2);
            }
        });
    }

    private static NewTopic toNewTopic(CreatableKafkaTopic<?, ?> creatableKafkaTopic) {
        return new NewTopic(creatableKafkaTopic.name(), Optional.of(Integer.valueOf(creatableKafkaTopic.config().partitions())), Optional.empty()).configs(creatableKafkaTopic.config().config());
    }
}
