package jp.ad.sinet.stream.plugins.kafka;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.SinetStreamException;
import jp.ad.sinet.stream.spi.PluginAsyncMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaAsyncMessageReader.class */
public class KafkaAsyncMessageReader extends KafkaBaseReader implements PluginAsyncMessageReader {

    @Generated
    private static final Logger log = Logger.getLogger(KafkaAsyncMessageReader.class.getName());
    private ExecutorService callbackExecutor;
    private List<Consumer<PluginMessageWrapper>> onMessageCallbacks;
    private List<Consumer<Throwable>> onFailureCallbacks;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaAsyncMessageReader(ReaderParameters readerParameters) {
        super(readerParameters);
        this.onMessageCallbacks = new CopyOnWriteArrayList();
        this.onFailureCallbacks = new CopyOnWriteArrayList();
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void submitConsumerLoop() {
        if (Objects.isNull(this.callbackExecutor) || this.callbackExecutor.isShutdown()) {
            this.callbackExecutor = Executors.newSingleThreadExecutor(runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                return thread;
            });
        }
        super.submitConsumerLoop();
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void append_consumer_records(ConsumerRecords<String, byte[]> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            log.finer(() -> {
                return "KAFKA message poll: " + getClientId() + ": " + consumerRecord.toString();
            });
            KafkaMessage kafkaMessage = new KafkaMessage(consumerRecord);
            for (Consumer<PluginMessageWrapper> consumer : this.onMessageCallbacks) {
                this.callbackExecutor.submit(() -> {
                    consumer.accept(kafkaMessage);
                });
            }
        }
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void append_exception(Throwable th) {
        log.log(Level.FINE, "Kafka reader error!", th);
        SinetStreamException wrapSinetStreamException = wrapSinetStreamException(th);
        for (Consumer<Throwable> consumer : this.onFailureCallbacks) {
            this.callbackExecutor.submit(() -> {
                consumer.accept(wrapSinetStreamException);
            });
        }
    }

    @Override // jp.ad.sinet.stream.plugins.kafka.KafkaBaseReader
    protected void stopPollingWorker() {
        super.stopPollingWorker();
        if (!Objects.nonNull(this.callbackExecutor) || this.callbackExecutor.isShutdown()) {
            return;
        }
        this.callbackExecutor.shutdownNow();
    }

    public void addOnMessageCallback(Consumer<PluginMessageWrapper> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.add(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.add(consumer2);
        }
        if (this.onMessageCallbacks.isEmpty() && this.onFailureCallbacks.isEmpty()) {
            return;
        }
        startPollingWorker();
    }

    public void removeOnMessageCallback(Consumer<PluginMessageWrapper> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.remove(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.remove(consumer2);
        }
        if (this.onMessageCallbacks.isEmpty() && this.onFailureCallbacks.isEmpty()) {
            stopPollingWorker();
        }
    }

    public void clearOnMessageCallback() {
        this.onMessageCallbacks.clear();
        this.onFailureCallbacks.clear();
        stopPollingWorker();
    }
}
