package org.apache.kafka.clients.producer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.util.Base64;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.DefaultPartitioner;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.Encoder;
import kafka.serializer.StringEncoder;
import kafka.utils.VerifiableProperties;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/producer/PulsarKafkaProducer.class */
public class PulsarKafkaProducer<K, V> extends Producer<K, V> {
    private final PulsarClient client;
    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
    private final Partitioner partitioner;
    private final Encoder<K> keySerializer;
    private final Encoder<V> valueSerializer;
    private final boolean isSendAsync;
    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers;
    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaProducer.class);
    public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS = "queue.buffering.max.ms";
    public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
    public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
    public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";

    public PulsarKafkaProducer(ProducerConfig producerConfig) {
        super((kafka.producer.Producer) null);
        this.producers = new ConcurrentHashMap();
        this.partitioner = producerConfig.partitionerClass() != null ? (Partitioner) newInstance(producerConfig.partitionerClass(), Partitioner.class, producerConfig.props()) : new DefaultPartitioner(producerConfig.props());
        Preconditions.checkNotNull(producerConfig.keySerializerClass(), "key-serializer class can't be null");
        Preconditions.checkNotNull(producerConfig.serializerClass(), "value-serializer class can't be null");
        this.keySerializer = (Encoder) newInstance(producerConfig.keySerializerClass(), Encoder.class, producerConfig.props());
        this.valueSerializer = (Encoder) newInstance(producerConfig.serializerClass(), Encoder.class, producerConfig.props());
        Properties properties = (producerConfig.props() == null || producerConfig.props().props() == null) ? new Properties() : producerConfig.props().props();
        String brokerList = producerConfig.brokerList();
        try {
            this.client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(brokerList).build();
            this.pulsarProducerBuilder = this.client.newProducer();
            boolean z = producerConfig.queueEnqueueTimeoutMs() == -1;
            this.isSendAsync = "async".equalsIgnoreCase(producerConfig.producerType());
            CompressionType compressionType = CompressionType.NONE;
            if ("gzip".equals(producerConfig.compressionCodec().name())) {
                compressionType = CompressionType.ZLIB;
            } else if ("snappy".equals(producerConfig.compressionCodec().name())) {
                compressionType = CompressionType.SNAPPY;
            }
            long queueBufferingMaxMs = producerConfig.queueBufferingMaxMs();
            if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES)) {
                this.pulsarProducerBuilder.maxPendingMessages(producerConfig.queueBufferingMaxMessages());
            }
            if (properties.containsKey(KAFKA_KEY_MAX_BATCH_MESSAGES)) {
                this.pulsarProducerBuilder.batchingMaxMessages(producerConfig.batchNumMessages());
            }
            if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS)) {
                this.pulsarProducerBuilder.batchingMaxPublishDelay(queueBufferingMaxMs, TimeUnit.MILLISECONDS);
            }
            if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
                this.pulsarProducerBuilder.sendTimeout(producerConfig.requestTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            this.pulsarProducerBuilder.blockIfQueueFull(z).compressionType(compressionType);
        } catch (PulsarClientException e) {
            throw new IllegalArgumentException("Failed to create pulsar-client using url = " + brokerList + ", properties = " + properties, e);
        }
    }

    public PulsarKafkaProducer(kafka.producer.Producer<K, V> producer) {
        this(producer.config());
    }

    public void send(KeyedMessage<K, V> keyedMessage) {
        try {
            org.apache.pulsar.client.api.Producer<byte[]> computeIfAbsent = this.producers.computeIfAbsent(keyedMessage.topic(), str -> {
                return createNewProducer(str);
            });
            TypedMessageBuilder<byte[]> newMessage = computeIfAbsent.newMessage();
            buildMessage(newMessage, keyedMessage);
            if (this.isSendAsync) {
                newMessage.sendAsync().handle((messageId, th) -> {
                    if (th == null) {
                        return null;
                    }
                    log.warn("publish failed for {}", computeIfAbsent.getTopic(), th);
                    return null;
                });
                return;
            }
            try {
                newMessage.send();
            } catch (PulsarClientException e) {
                log.warn("publish failed for {}", computeIfAbsent.getTopic(), e);
                throw new IllegalStateException("Failed to publish message " + keyedMessage.topic(), e);
            }
        } catch (Exception e2) {
            throw new IllegalArgumentException("Failed to create producer for " + keyedMessage.topic(), e2);
        }
    }

    public void send(List<KeyedMessage<K, V>> list) {
        if (list != null) {
            list.forEach(this::send);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void buildMessage(TypedMessageBuilder<byte[]> typedMessageBuilder, KeyedMessage<K, V> keyedMessage) {
        if (keyedMessage.key() != null) {
            typedMessageBuilder.key(getKey(keyedMessage.topic(), keyedMessage.key()));
        }
        typedMessageBuilder.value(this.valueSerializer.toBytes(keyedMessage.message()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private String getKey(String str, K k) {
        if (this.keySerializer != null && (this.keySerializer instanceof StringEncoder)) {
            return (String) k;
        }
        return Base64.getEncoder().encodeToString(this.keySerializer.toBytes(k));
    }

    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String str) {
        try {
            this.pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
            this.pulsarProducerBuilder.messageRouter(new MessageRouter() { // from class: org.apache.kafka.clients.producer.PulsarKafkaProducer.1
                private static final long serialVersionUID = 1;

                public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
                    return PulsarKafkaProducer.this.partitioner.partition(message.getKey(), topicMetadata.numPartitions());
                }
            });
            log.info("Creating producer for topic {} with config {}", str, this.pulsarProducerBuilder.toString());
            return this.pulsarProducerBuilder.clone().topic(str).create();
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public <T> T newInstance(String str, Class<T> cls, VerifiableProperties verifiableProperties) {
        try {
            Class<?> cls2 = Class.forName(str);
            if (cls2 == null) {
                return null;
            }
            Object newInstance = newInstance(cls2, verifiableProperties);
            if (cls.isInstance(newInstance)) {
                return cls.cast(newInstance);
            }
            throw new IllegalArgumentException(cls2.getName() + " is not an instance of " + cls.getName());
        } catch (ClassNotFoundException | NoClassDefFoundError e) {
            throw new IllegalArgumentException("class not found for :" + str);
        }
    }

    public static <T> T newInstance(Class<T> cls, VerifiableProperties verifiableProperties) {
        try {
            try {
                Constructor<T> constructor = cls.getConstructor(VerifiableProperties.class);
                constructor.setAccessible(true);
                return constructor.newInstance(verifiableProperties);
            } catch (Exception e) {
                return cls.newInstance();
            }
        } catch (IllegalAccessException e2) {
            throw new IllegalArgumentException("Could not instantiate class " + cls.getName(), e2);
        } catch (InstantiationException e3) {
            throw new IllegalArgumentException("Could not instantiate class " + cls.getName() + " Does it have a public no-argument constructor?", e3);
        } catch (NullPointerException e4) {
            throw new IllegalArgumentException("Requested class was null", e4);
        }
    }

    public void close() {
        if (this.producers != null) {
            this.producers.forEach((str, producer) -> {
                try {
                    producer.close();
                } catch (PulsarClientException e) {
                    log.warn("Failed to close producer for {}: {}", str, e.getMessage());
                }
            });
        }
        if (this.client != null) {
            try {
                this.client.close();
            } catch (PulsarClientException e) {
                log.warn("Failed to close pulsar-client", e);
            }
        }
    }

    @VisibleForTesting
    public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
        return this.pulsarProducerBuilder;
    }

    @VisibleForTesting
    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    @VisibleForTesting
    public Encoder<K> getKeySerializer() {
        return this.keySerializer;
    }

    @VisibleForTesting
    public Encoder<V> getValueSerializer() {
        return this.valueSerializer;
    }
}
