package org.darkphoenixs.kafka.core;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;

/* loaded from: input_file:org/darkphoenixs/kafka/core/KafkaMessageNewSender.class */
public class KafkaMessageNewSender<K, V> implements KafkaMessageSender<K, V> {
    private static final AtomicReference<KafkaMessageNewSender<?, ?>> instance = new AtomicReference<>();
    private final KafkaProducer<K, V> kafkaProducer;
    protected Callback sendCallback = new Callback() { // from class: org.darkphoenixs.kafka.core.KafkaMessageNewSender.1
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                KafkaMessageSender.logger.error("Send message failed.", exc);
            }
        }
    };

    private KafkaMessageNewSender(Properties properties) {
        this.kafkaProducer = new KafkaProducer<>(properties);
    }

    public static synchronized KafkaMessageNewSender getOrCreateInstance(Properties properties) {
        if (instance.get() == null) {
            instance.set(new KafkaMessageNewSender<>(properties));
        }
        return instance.get();
    }

    public List<PartitionInfo> getPartitions(String str) {
        return this.kafkaProducer.partitionsFor(str);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageSender
    public void send(String str, V v) {
        this.kafkaProducer.send(new ProducerRecord(str, v), this.sendCallback);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageSender
    public void sendWithKey(String str, K k, V v) {
        this.kafkaProducer.send(new ProducerRecord(str, k, v), this.sendCallback);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageSender
    public void shutDown() {
        this.kafkaProducer.flush();
        this.kafkaProducer.close();
        instance.set(null);
    }
}
