package org.creekservice.internal.kafka.extension;

import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.creekservice.api.kafka.extension.KafkaClientsExtension;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.metadata.topic.KafkaTopicDescriptor;
import org.creekservice.internal.kafka.extension.resource.TopicRegistry;

/* loaded from: input_file:org/creekservice/internal/kafka/extension/ClientsExtension.class */
public final class ClientsExtension implements KafkaClientsExtension {
    static final String NAME = "org.creekservice.kafka.clients";
    private final ClustersProperties clustersProperties;
    private final TopicRegistry resources;
    private final Map<String, Producer<byte[], byte[]>> producers = new ConcurrentHashMap();
    private final Map<String, Consumer<byte[], byte[]>> consumers = new ConcurrentHashMap();

    public ClientsExtension(ClustersProperties clustersProperties, TopicRegistry topicRegistry) {
        this.clustersProperties = (ClustersProperties) Objects.requireNonNull(clustersProperties, "clustersProperties");
        this.resources = (TopicRegistry) Objects.requireNonNull(topicRegistry, "resources");
    }

    public String name() {
        return NAME;
    }

    @Override // org.creekservice.api.kafka.extension.KafkaClientsExtension
    public Properties properties(String str) {
        return this.clustersProperties.properties(str);
    }

    @Override // org.creekservice.api.kafka.extension.KafkaClientsExtension
    public <K, V> KafkaTopic<K, V> topic(KafkaTopicDescriptor<K, V> kafkaTopicDescriptor) {
        return this.resources.topic(kafkaTopicDescriptor);
    }

    public KafkaTopic<?, ?> topic(String str, String str2) {
        return this.resources.topic(str, str2);
    }

    @Override // org.creekservice.api.kafka.extension.KafkaClientsExtension
    public Producer<byte[], byte[]> producer(String str) {
        return this.producers.computeIfAbsent(str, this::createProducer);
    }

    @Override // org.creekservice.api.kafka.extension.KafkaClientsExtension
    public Consumer<byte[], byte[]> consumer(String str) {
        return this.consumers.computeIfAbsent(str, this::createConsumer);
    }

    @Override // org.creekservice.api.kafka.extension.KafkaClientsExtension
    public void close(Duration duration) {
        Iterator<Producer<byte[], byte[]>> it = this.producers.values().iterator();
        while (it.hasNext()) {
            it.next().close(duration);
        }
        this.producers.clear();
        Iterator<Consumer<byte[], byte[]>> it2 = this.consumers.values().iterator();
        while (it2.hasNext()) {
            it2.next().close(duration);
        }
        this.consumers.clear();
    }

    private Producer<byte[], byte[]> createProducer(String str) {
        return new KafkaProducer(this.clustersProperties.get(str), new ByteArraySerializer(), new ByteArraySerializer());
    }

    private KafkaConsumer<byte[], byte[]> createConsumer(String str) {
        return new KafkaConsumer<>(this.clustersProperties.get(str), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }
}
