package io.apicurio.tests.serdes;

import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
import io.apicurio.registry.utils.serde.AvroKafkaSerializer;
import io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer;
import io.apicurio.registry.utils.serde.ProtobufKafkaSerializer;
import io.apicurio.registry.utils.serde.strategy.RecordIdStrategy;
import io.apicurio.registry.utils.serde.strategy.TopicIdStrategy;
import io.apicurio.registry.utils.serde.strategy.TopicRecordIdStrategy;
import io.apicurio.tests.RegistryFacade;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/tests/serdes/KafkaClients.class */
public class KafkaClients {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClients.class);
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "new-employees";

    public static Producer<Object, Object> createProducer(String str, String str2, String str3, String str4) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("client.id", "Producer-" + str3);
        properties.put("acks", "all");
        properties.put("key.serializer", str);
        properties.put("value.serializer", str2);
        if (str2.contains("confluent")) {
            properties.put("schema.registry.url", "http://" + RegistryFacade.REGISTRY_URL + ":" + RegistryFacade.REGISTRY_PORT + "/ccompat");
            properties.put("value.subject.name.strategy", str4);
        } else {
            properties.put("apicurio.registry.url", "http://" + RegistryFacade.REGISTRY_URL + ":" + RegistryFacade.REGISTRY_PORT);
            properties.put("apicurio.registry.artifact-id", str4);
        }
        return new KafkaProducer(properties);
    }

    public static Consumer<Long, GenericRecord> createConsumer(String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("group.id", "Consumer-" + str3);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", str);
        properties.put("value.deserializer", str2);
        if (str2.contains("confluent")) {
            properties.put("schema.registry.url", "http://" + RegistryFacade.REGISTRY_URL + ":" + RegistryFacade.REGISTRY_PORT + "/ccompat");
        } else {
            properties.put("apicurio.registry.url", "http://" + RegistryFacade.REGISTRY_URL + ":" + RegistryFacade.REGISTRY_PORT);
        }
        return new KafkaConsumer(properties);
    }

    public static CompletableFuture<Integer> produceAvroConfluentMessagesTopicStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), KafkaAvroSerializer.class.getName(), TopicNameStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceAvroConfluentMessagesRecordStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), KafkaAvroSerializer.class.getName(), RecordNameStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceAvroConfluentMessagesTopicRecordStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), KafkaAvroSerializer.class.getName(), TopicRecordNameStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceAvroApicurioMessagesTopicStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), AvroKafkaSerializer.class.getName(), TopicIdStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceAvroApicurioMessagesRecordStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), AvroKafkaSerializer.class.getName(), RecordIdStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceAvroApicurioMessagesTopicRecordStrategy(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), AvroKafkaSerializer.class.getName(), TopicRecordIdStrategy.class.getName(), strArr);
    }

    public static CompletableFuture<Integer> produceProtobufMessages(String str, String str2, Schema schema, int i, String... strArr) {
        return produceMessages(str, str2, schema, i, StringSerializer.class.getName(), ProtobufKafkaSerializer.class.getName(), TopicIdStrategy.class.getName(), strArr);
    }

    private static CompletableFuture<Integer> produceMessages(String str, String str2, Schema schema, int i, String str3, String str4, String str5, String... strArr) {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            Producer<Object, Object> createProducer = createProducer(str3, str4, str, str5);
            int i2 = 0;
            while (i2 < i) {
                try {
                    GenericData.Record record = new GenericData.Record(schema);
                    String str6 = "value-" + i2;
                    for (String str7 : strArr) {
                        record.put(str7, str6);
                    }
                    LOGGER.info("Sending message {} to topic {}", record, str);
                    createProducer.send(new ProducerRecord(str, str2, record));
                    i2++;
                } catch (Throwable th) {
                    createProducer.flush();
                    createProducer.close();
                    throw th;
                }
            }
            LOGGER.info("Produced {} messages", Integer.valueOf(i2));
            createProducer.flush();
            createProducer.close();
            return Integer.valueOf(i2);
        });
        try {
            supplyAsync.get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            supplyAsync.completeExceptionally(e);
        }
        return supplyAsync;
    }

    public static CompletableFuture<Integer> consumeAvroConfluentMessages(String str, int i) {
        return consumeMessages(str, i, StringDeserializer.class.getName(), KafkaAvroDeserializer.class.getName());
    }

    public static CompletableFuture<Integer> consumeAvroApicurioMessages(String str, int i) {
        return consumeMessages(str, i, StringDeserializer.class.getName(), AvroKafkaDeserializer.class.getName());
    }

    public static CompletableFuture<Integer> consumeProtobufMessages(String str, int i) {
        return consumeMessages(str, i, StringDeserializer.class.getName(), ProtobufKafkaDeserializer.class.getName());
    }

    private static CompletableFuture<Integer> consumeMessages(String str, int i, String str2, String str3) {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            Consumer<Long, GenericRecord> createConsumer = createConsumer(str2, str3, str);
            createConsumer.subscribe(Collections.singletonList(str));
            AtomicInteger atomicInteger = new AtomicInteger();
            while (atomicInteger.get() < i) {
                try {
                    ConsumerRecords poll = createConsumer.poll(Duration.ofSeconds(1L));
                    if (poll.count() == 0) {
                        LOGGER.info("None found");
                    } else {
                        poll.forEach(consumerRecord -> {
                            atomicInteger.getAndIncrement();
                            LOGGER.info("{} {} {} {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.value()});
                        });
                    }
                } catch (Throwable th) {
                    createConsumer.close();
                    throw th;
                }
            }
            LOGGER.info("Consumed {} messages", Integer.valueOf(atomicInteger.get()));
            createConsumer.close();
            return Integer.valueOf(atomicInteger.get());
        });
        try {
            supplyAsync.get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            supplyAsync.completeExceptionally(e);
        }
        return supplyAsync;
    }
}
