package jp.ad.sinet.stream.plugins.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.spi.PluginMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaMessageReader.class */
public class KafkaMessageReader extends KafkaBaseReader implements PluginMessageReader {

    @Generated
    private static final Logger log = Logger.getLogger(KafkaMessageReader.class.getName());
    private BlockingQueue<Future<ConsumerRecord<String, byte[]>>> queue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaMessageReader(ReaderParameters readerParameters) {
        super(readerParameters);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    public void submitConsumerLoop() {
        if (Objects.isNull(this.queue)) {
            this.queue = new LinkedBlockingQueue();
        }
        super.submitConsumerLoop();
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void append_consumer_records(ConsumerRecords<String, byte[]> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            log.finer(() -> {
                return "KAFKA message poll: " + getClientId() + ": " + consumerRecord.toString();
            });
            this.queue.add(CompletableFuture.completedFuture(consumerRecord));
        }
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void append_exception(Throwable th) {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(th);
        this.queue.add(completableFuture);
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public KafkaMessage m11read() {
        try {
            Future<ConsumerRecord<String, byte[]>> poll = this.queue.poll(this.receiveTimeout.toNanos(), TimeUnit.NANOSECONDS);
            if (Objects.isNull(poll)) {
                return null;
            }
            KafkaMessage kafkaMessage = new KafkaMessage(poll.get());
            if (this.sendOffsetsEnabled.get()) {
                sendOffsetsToTransaction(kafkaMessage);
            }
            return kafkaMessage;
        } catch (InterruptedException | ExecutionException e) {
            throw wrapSinetStreamException(e);
        }
    }

    public Stream<PluginMessageWrapper> stream() {
        return !getConsistency().equals(Consistency.EXACTLY_ONCE) ? super.stream() : getExactlyOnceStream();
    }

    private Stream<PluginMessageWrapper> getExactlyOnceStream() {
        this.sendOffsetsEnabled.set(false);
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<PluginMessageWrapper>() { // from class: jp.ad.sinet.stream.plugins.kafka.KafkaMessageReader.1
            private PluginMessageWrapper cache = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                this.cache = KafkaMessageReader.this.m11read();
                return Objects.nonNull(this.cache);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public PluginMessageWrapper next() {
                PluginMessageWrapper pluginMessageWrapper = (PluginMessageWrapper) Optional.ofNullable(this.cache).orElseGet(() -> {
                    return KafkaMessageReader.this.m11read();
                });
                KafkaMessageReader.this.sendOffsetsToTransaction((KafkaMessage) pluginMessageWrapper);
                return pluginMessageWrapper;
            }
        }, 272), false);
    }

    public void sendOffsetsToTransaction(KafkaMessage kafkaMessage) {
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(kafkaMessage.getTopic(), kafkaMessage.m8getRaw().partition());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaMessage.m8getRaw().offset() + 1);
        hashMap.put(topicPartition, offsetAndMetadata);
        log.finer(() -> {
            return "KAFKA send offset: " + getClientId() + ": " + topicPartition.topic() + ": " + offsetAndMetadata.toString();
        });
        this.producer.beginTransaction();
        this.producer.sendOffsetsToTransaction(hashMap, this.groupId);
        this.producer.commitTransaction();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    public void setupConsumer(Properties properties) {
        super.setupConsumer(properties);
        startPollingWorker();
    }
}
