package io.cloudevents.kafka;

import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.format.Wire;
import io.cloudevents.format.builder.EventStep;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cloudevents/kafka/CloudEventsKafkaProducer.class */
public class CloudEventsKafkaProducer<K, A extends Attributes, T> implements Producer<K, CloudEvent<A, T>> {
    private static final Logger log = LoggerFactory.getLogger(CloudEventsKafkaProducer.class);
    private final Producer<K, byte[]> producer;
    private final EventStep<A, T, byte[], byte[]> builder;

    public CloudEventsKafkaProducer(Properties properties, EventStep<A, T, byte[], byte[]> eventStep) {
        Objects.requireNonNull(properties);
        Objects.requireNonNull(eventStep);
        Optional.ofNullable(properties.get("value.serializer")).map(obj -> {
            return obj.toString();
        }).filter(str -> {
            return !str.contains(ByteArraySerializer.class.getName());
        }).ifPresent(str2 -> {
            log.warn("Fixing the wrong deserializer {}", str2);
            properties.put("value.serializer", ByteArraySerializer.class);
        });
        this.builder = eventStep;
        this.producer = new KafkaProducer(properties);
    }

    public CloudEventsKafkaProducer(EventStep<A, T, byte[], byte[]> eventStep, Producer<K, byte[]> producer) {
        Objects.requireNonNull(eventStep);
        Objects.requireNonNull(producer);
        this.builder = eventStep;
        this.producer = producer;
    }

    private Wire<byte[], String, byte[]> marshal(Supplier<CloudEvent<A, T>> supplier) {
        return (Wire) Optional.ofNullable(this.builder).map(eventStep -> {
            return eventStep.withEvent(supplier);
        }).map(marshalStep -> {
            return marshalStep.marshal();
        }).get();
    }

    private Set<Header> marshal(Map<String, byte[]> map) {
        return (Set) map.entrySet().stream().map(entry -> {
            return new AbstractMap.SimpleEntry(entry.getKey(), entry.getValue());
        }).map(simpleEntry -> {
            return new RecordHeader((String) simpleEntry.getKey(), (byte[]) simpleEntry.getValue());
        }).collect(Collectors.toSet());
    }

    private ProducerRecord<K, byte[]> marshal(ProducerRecord<K, CloudEvent<A, T>> producerRecord) {
        Wire<byte[], String, byte[]> marshal = marshal(() -> {
            return (CloudEvent) producerRecord.value();
        });
        Set<Header> marshal2 = marshal(marshal.getHeaders());
        return new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), (byte[]) marshal.getPayload().orElse(null), marshal2);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, CloudEvent<A, T>> producerRecord) {
        return this.producer.send(marshal(producerRecord));
    }

    public Future<RecordMetadata> send(ProducerRecord<K, CloudEvent<A, T>> producerRecord, Callback callback) {
        return this.producer.send(marshal(producerRecord), callback);
    }

    public void abortTransaction() throws ProducerFencedException {
        this.producer.abortTransaction();
    }

    public void beginTransaction() throws ProducerFencedException {
        this.producer.beginTransaction();
    }

    public void close() {
        this.producer.close();
    }

    public void close(long j, TimeUnit timeUnit) {
        this.producer.close(j, timeUnit);
    }

    public void commitTransaction() throws ProducerFencedException {
        this.producer.commitTransaction();
    }

    public void flush() {
        this.producer.flush();
    }

    public void initTransactions() {
        this.producer.initTransactions();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    public List<PartitionInfo> partitionsFor(String str) {
        return this.producer.partitionsFor(str);
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
        this.producer.sendOffsetsToTransaction(map, str);
    }
}
