package jp.ad.sinet.stream.plugins.kafka;

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.spi.PluginAsyncMessageWriter;
import jp.ad.sinet.stream.spi.WriterParameters;
import jp.ad.sinet.stream.utils.MessageUtils;
import jp.ad.sinet.stream.utils.Timestamped;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jdeferred2.DeferredCallable;
import org.jdeferred2.DeferredManager;
import org.jdeferred2.Promise;
import org.jdeferred2.impl.DefaultDeferredManager;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaAsyncMessageWriter.class */
public class KafkaAsyncMessageWriter extends KafkaBaseWriter implements PluginAsyncMessageWriter {

    @Generated
    private static final Logger log = Logger.getLogger(KafkaAsyncMessageWriter.class.getName());
    private final DefaultDeferredManager manager;
    private final Function<ProducerRecord<String, byte[]>, Promise<?, Throwable, Void>> sender;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaAsyncMessageWriter(WriterParameters writerParameters) {
        super(writerParameters);
        this.manager = createDeferredManager();
        if (getConsistency().equals(Consistency.EXACTLY_ONCE)) {
            this.sender = this::transactionalSendRecord;
        } else {
            this.sender = this::sendRecord;
        }
    }

    private DefaultDeferredManager createDeferredManager() {
        return new DefaultDeferredManager(Executors.newFixedThreadPool(MessageUtils.toInteger(getConfig().getOrDefault("thread_pool_num", 4)).intValue()));
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseWriter
    public void close() {
        super.close();
        this.manager.shutdown();
    }

    public Promise<?, Throwable, Void> write(Timestamped<byte[]> timestamped) {
        return this.sender.apply(new ProducerRecord<>(this.topic, (byte[]) timestamped.getValue()));
    }

    private Promise<?, Throwable, Void> sendRecord(ProducerRecord<String, byte[]> producerRecord) {
        final Future send = this.producer.send(producerRecord);
        return this.manager.when(new DeferredCallable<RecordMetadata, Void>(DeferredManager.StartPolicy.AUTO) { // from class: jp.ad.sinet.stream.plugins.kafka.KafkaAsyncMessageWriter.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public RecordMetadata m2call() {
                try {
                    return (RecordMetadata) send.get();
                } catch (Throwable th) {
                    throw KafkaAsyncMessageWriter.this.wrapSinetStreamException(th);
                }
            }
        });
    }

    private Promise<?, Throwable, Void> transactionalSendRecord(ProducerRecord<String, byte[]> producerRecord) {
        beginTransaction();
        final Future send = this.producer.send(producerRecord);
        return this.manager.when(new DeferredCallable<RecordMetadata, Void>(DeferredManager.StartPolicy.AUTO) { // from class: jp.ad.sinet.stream.plugins.kafka.KafkaAsyncMessageWriter.2
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public RecordMetadata m3call() {
                try {
                    RecordMetadata recordMetadata = (RecordMetadata) send.get();
                    KafkaAsyncMessageWriter.this.commitTransaction();
                    return recordMetadata;
                } catch (Throwable th) {
                    KafkaAsyncMessageWriter.this.abortTransaction();
                    throw KafkaAsyncMessageWriter.this.wrapSinetStreamException(th);
                }
            }
        });
    }
}
