package org.creekservice.internal.kafka.streams.test.extension.handler;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.system.test.extension.test.model.InputHandler;
import org.creekservice.internal.kafka.extension.ClientsExtension;
import org.creekservice.internal.kafka.streams.test.extension.model.TopicInput;
import org.creekservice.internal.kafka.streams.test.extension.model.TopicRecord;
import org.creekservice.internal.kafka.streams.test.extension.yaml.TypeCoercer;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/handler/TopicInputHandler.class */
public final class TopicInputHandler implements InputHandler<TopicInput> {
    private final ClientsExtension clientsExt;
    private final TypeCoercer coercer;
    private final Set<Producer<byte[], byte[]>> toFlush;
    private final TopicValidator topicValidator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/handler/TopicInputHandler$TopicInputException.class */
    public static final class TopicInputException extends RuntimeException {
        TopicInputException(String str, Throwable th) {
            super(str, th);
        }
    }

    public TopicInputHandler(ClientsExtension clientsExtension, TopicValidator topicValidator) {
        this(clientsExtension, new TypeCoercer(), topicValidator);
    }

    TopicInputHandler(ClientsExtension clientsExtension, TypeCoercer typeCoercer, TopicValidator topicValidator) {
        this.toFlush = new HashSet();
        this.clientsExt = (ClientsExtension) Objects.requireNonNull(clientsExtension, "clientsExt");
        this.coercer = (TypeCoercer) Objects.requireNonNull(typeCoercer, "typeCoercer");
        this.topicValidator = (TopicValidator) Objects.requireNonNull(topicValidator, "topicValidator");
    }

    public void process(TopicInput topicInput, InputHandler.InputOptions inputOptions) {
        topicInput.records().forEach(this::process);
    }

    public void flush() {
        this.toFlush.forEach((v0) -> {
            v0.flush();
        });
        this.toFlush.clear();
    }

    private void process(TopicRecord topicRecord) {
        send(topicRecord, kafkaTopic(topicRecord));
    }

    private <K, V> void send(TopicRecord topicRecord, KafkaTopic<K, V> kafkaTopic) {
        this.topicValidator.validateCanProduce(kafkaTopic);
        byte[] bArr = (byte[]) topicRecord.key().map(obj -> {
            return coerceKey(obj, kafkaTopic, topicRecord);
        }).map(obj2 -> {
            return serializeKey(obj2, kafkaTopic, topicRecord);
        }).orElse(null, null);
        byte[] bArr2 = (byte[]) topicRecord.value().map(obj3 -> {
            return coerceValue(obj3, kafkaTopic, topicRecord);
        }).map(obj4 -> {
            return serializeValue(obj4, kafkaTopic, topicRecord);
        }).orElse(null, null);
        Producer<byte[], byte[]> producer = this.clientsExt.producer(topicRecord.clusterName());
        producer.send(new ProducerRecord(kafkaTopic.name(), bArr, bArr2));
        this.toFlush.add(producer);
    }

    private KafkaTopic<?, ?> kafkaTopic(TopicRecord topicRecord) {
        try {
            return this.clientsExt.topic(topicRecord.clusterName(), topicRecord.topicName());
        } catch (Exception e) {
            throw new TopicInputException("The input record's cluster or topic is not known. cluster: " + topicRecord.clusterName() + ", topic: " + topicRecord.topicName() + ", location: " + String.valueOf(topicRecord.location()), e);
        }
    }

    private <K> K coerceKey(Object obj, KafkaTopic<K, ?> kafkaTopic, TopicRecord topicRecord) {
        try {
            return (K) this.coercer.coerce(obj, kafkaTopic.descriptor().key().type());
        } catch (Exception e) {
            throw new TopicInputException("The record's key is not compatible with the topic's key type. key: " + String.valueOf(obj) + ", key_type: " + obj.getClass().getName() + ", topic_key_type: " + kafkaTopic.descriptor().key().type().getName() + ", topic: " + kafkaTopic.name() + ", location: " + String.valueOf(topicRecord.location()), e);
        }
    }

    private <V> V coerceValue(Object obj, KafkaTopic<?, V> kafkaTopic, TopicRecord topicRecord) {
        try {
            return (V) this.coercer.coerce(obj, kafkaTopic.descriptor().value().type());
        } catch (Exception e) {
            throw new TopicInputException("The record's value is not compatible with the topic's value type. value: " + String.valueOf(obj) + ", value_type: " + obj.getClass().getName() + ", topic_value_type: " + kafkaTopic.descriptor().value().type().getName() + ", topic: " + kafkaTopic.name() + ", location: " + String.valueOf(topicRecord.location()), e);
        }
    }

    private <K> byte[] serializeKey(K k, KafkaTopic<K, ?> kafkaTopic, TopicRecord topicRecord) {
        try {
            return kafkaTopic.serializeKey(k);
        } catch (Exception e) {
            throw new TopicInputException("Failed to serialize the record's key: " + String.valueOf(k) + ", location: " + String.valueOf(topicRecord.location()), e);
        }
    }

    private <V> byte[] serializeValue(V v, KafkaTopic<?, V> kafkaTopic, TopicRecord topicRecord) {
        try {
            return kafkaTopic.serializeValue(v);
        } catch (Exception e) {
            throw new TopicInputException("Failed to serialize the record's value: " + String.valueOf(v) + ", location: " + String.valueOf(topicRecord.location()), e);
        }
    }
}
