package org.creekservice.internal.kafka.extension.resource;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.creekservice.api.base.type.CodeLocation;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.topic.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.topic.KafkaTopicDescriptor;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;
import org.creekservice.api.service.extension.component.model.ResourceHandler;

/* loaded from: input_file:org/creekservice/internal/kafka/extension/resource/TopicResourceHandler.class */
public class TopicResourceHandler implements ResourceHandler<KafkaTopicDescriptor<?, ?>> {
    private final StructuredLogger logger;
    private final TopicRegistrar resources;
    private final ClustersProperties properties;
    private final TopicClient.Factory topicClientFactory;
    private final TopicResourceFactory topicResourceFactory;
    private final KafkaResourceValidator validator;

    public TopicResourceHandler(TopicClient.Factory factory, TopicRegistrar topicRegistrar, ClustersProperties clustersProperties, KafkaSerdeProviders kafkaSerdeProviders) {
        this(factory, clustersProperties, topicRegistrar, new TopicResourceFactory(kafkaSerdeProviders), new KafkaResourceValidator(), StructuredLoggerFactory.internalLogger(TopicResourceHandler.class));
    }

    TopicResourceHandler(TopicClient.Factory factory, ClustersProperties clustersProperties, TopicRegistrar topicRegistrar, TopicResourceFactory topicResourceFactory, KafkaResourceValidator kafkaResourceValidator, StructuredLogger structuredLogger) {
        this.topicClientFactory = (TopicClient.Factory) Objects.requireNonNull(factory, "topicClientFactory");
        this.properties = (ClustersProperties) Objects.requireNonNull(clustersProperties, "properties");
        this.resources = (TopicRegistrar) Objects.requireNonNull(topicRegistrar, "resources");
        this.topicResourceFactory = (TopicResourceFactory) Objects.requireNonNull(topicResourceFactory, "topicFactory");
        this.validator = (KafkaResourceValidator) Objects.requireNonNull(kafkaResourceValidator, "validator");
        this.logger = (StructuredLogger) Objects.requireNonNull(structuredLogger, "logger");
    }

    public void validate(Collection<? extends KafkaTopicDescriptor<?, ?>> collection) {
        this.validator.validateGroup(collection);
    }

    public void ensure(Collection<? extends KafkaTopicDescriptor<?, ?>> collection) {
        ((Map) collection.stream().map(TopicResourceHandler::toCreatable).collect(Collectors.groupingBy((v0) -> {
            return v0.cluster();
        }))).forEach(this::ensure);
    }

    public void prepare(Collection<? extends KafkaTopicDescriptor<?, ?>> collection) {
        ((Map) collection.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.cluster();
        }))).forEach(this::prepare);
    }

    private void ensure(String str, List<? extends CreatableKafkaTopic<?, ?>> list) {
        this.logger.debug("Ensuring topics", logEntryCustomizer -> {
            logEntryCustomizer.with(LoggingField.topicIds, list.stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList()));
        });
        this.topicClientFactory.create(str, this.properties.get(str)).ensureTopicsExist(list);
    }

    private void prepare(String str, List<? extends KafkaTopicDescriptor<?, ?>> list) {
        this.logger.debug("Preparing topics", logEntryCustomizer -> {
            logEntryCustomizer.with(LoggingField.topicIds, list.stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList()));
        });
        Map<String, Object> map = this.properties.get(str);
        Stream<R> map2 = list.stream().map(kafkaTopicDescriptor -> {
            return this.topicResourceFactory.create(kafkaTopicDescriptor, map);
        });
        TopicRegistrar topicRegistrar = this.resources;
        Objects.requireNonNull(topicRegistrar);
        map2.forEach(topicRegistrar::register);
    }

    private static CreatableKafkaTopic<?, ?> toCreatable(KafkaTopicDescriptor<?, ?> kafkaTopicDescriptor) {
        if (kafkaTopicDescriptor instanceof CreatableKafkaTopic) {
            return (CreatableKafkaTopic) kafkaTopicDescriptor;
        }
        throw new IllegalArgumentException("Topic descriptor is not creatable: id: " + String.valueOf(kafkaTopicDescriptor.id()) + ", type: " + kafkaTopicDescriptor.getClass().getName() + ", location: " + CodeLocation.codeLocation(kafkaTopicDescriptor));
    }
}
