package jp.ad.sinet.stream.plugins.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.spi.ReaderParameters;
import jp.ad.sinet.stream.utils.MessageUtils;
import lombok.Generated;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaBaseReader.class */
public abstract class KafkaBaseReader extends KafkaBaseIO {
    private KafkaConsumer<String, byte[]> consumer;
    final Properties consumerProperties;
    private final AtomicBoolean closed;
    private final List<String> topics;
    final Duration receiveTimeout;
    String groupId;
    AtomicBoolean sendOffsetsEnabled;
    private final Lock lock;
    private final Condition startPolling;
    private static final Map<String, Function<Object, Object>> CONSUMER_PARAMETER_NAMES_MAP;
    KafkaProducer<byte[], byte[]> producer;
    private ExecutorService worker;

    @Generated
    private static final Logger log = Logger.getLogger(KafkaBaseReader.class.getName());
    private static final AtomicInteger KAFKA_GROUP_ID_SEQUENCE = new AtomicInteger(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaBaseReader(ReaderParameters readerParameters) {
        super(readerParameters.getService(), readerParameters.getConsistency(), readerParameters.getClientId(), readerParameters.getConfig(), readerParameters.getValueType(), readerParameters.isDataEncryption());
        this.closed = new AtomicBoolean(false);
        this.sendOffsetsEnabled = new AtomicBoolean(false);
        this.lock = new ReentrantLock();
        this.startPolling = this.lock.newCondition();
        this.topics = Collections.unmodifiableList(readerParameters.getTopics());
        this.receiveTimeout = readerParameters.getReceiveTimeout();
        this.consumerProperties = getKafkaProperties();
        setupConsumerProperties(readerParameters);
        setupConsumer(this.consumerProperties);
    }

    private void setupConsumerProperties(ReaderParameters readerParameters) {
        CONSUMER_PARAMETER_NAMES_MAP.forEach((str, function) -> {
            updateProperty(this.consumerProperties, str, function);
        });
        setupGroupId(this.consumerProperties);
        setupSSLProperties(this.config, this.consumerProperties);
        setupConsistencyProperties();
        setupSASLProperties(this.config, this.consumerProperties);
    }

    private void setupConsistencyProperties() {
        if (getConsistency().equals(Consistency.EXACTLY_ONCE)) {
            this.consumerProperties.put("enable.auto.commit", "false");
            this.consumerProperties.put("isolation.level", "read_committed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startPollingWorker() {
        this.lock.lock();
        try {
            if (Objects.isNull(this.worker) || this.worker.isShutdown()) {
                submitConsumerLoop();
                this.startPolling.awaitUninterruptibly();
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void submitConsumerLoop() {
        log.fine(() -> {
            return "KAFKA: start polling thread: " + getClientId();
        });
        this.worker = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            return thread;
        });
        this.worker.submit(() -> {
            if (!this.closed.get()) {
                this.lock.lock();
                try {
                    try {
                        ConsumerRecords<String, byte[]> poll = this.consumer.poll(Duration.ofMillis(100L));
                        this.startPolling.signalAll();
                        append_consumer_records(poll);
                        this.lock.unlock();
                    } finally {
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
            while (!this.closed.get()) {
                log.finest(() -> {
                    return "KAFKA message poll: " + getClientId();
                });
                try {
                    append_consumer_records(this.consumer.poll(Duration.ofMillis(100L)));
                } catch (WakeupException e) {
                    log.log(Level.FINE, "KAFKA consumer wakeup", e);
                } catch (Throwable th2) {
                    log.log(Level.FINE, th2, () -> {
                        return "KAFKA polling loop: " + getClientId();
                    });
                    append_exception(th2);
                    throw th2;
                }
            }
            log.fine(() -> {
                return "KAFKA consumer close: " + getClientId();
            });
            this.consumer.close();
        });
    }

    protected abstract void append_consumer_records(ConsumerRecords<String, byte[]> consumerRecords);

    protected abstract void append_exception(Throwable th);

    private void setupGroupId(Properties properties) {
        Optional.ofNullable(properties.getProperty("group.id")).map((v0) -> {
            return MessageUtils.toString(v0);
        }).ifPresent(str -> {
            this.groupId = str;
        });
        if (Objects.isNull(this.groupId) || this.groupId.isEmpty()) {
            this.groupId = generateGroupId();
        }
        this.consumerProperties.put("group.id", this.groupId);
    }

    private String generateGroupId() {
        return this.service + "-group-" + KAFKA_GROUP_ID_SEQUENCE.getAndIncrement() + '-' + RandomStringUtils.randomAlphabetic(8);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupConsumer(Properties properties) {
        log.fine(() -> {
            return "KAFKA consumer init: " + getClientId();
        });
        this.consumer = new KafkaConsumer<>(properties, Serdes.String().deserializer(), Serdes.ByteArray().deserializer());
        log.fine(() -> {
            return "KAFKA consumer subscribe: " + getClientId() + ": " + getTopic();
        });
        partitionsFor();
        this.consumer.subscribe(this.topics);
        if (getConsistency().equals(Consistency.EXACTLY_ONCE)) {
            setupOffsetsProducer();
        }
    }

    private void partitionsFor() {
        try {
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                this.consumer.partitionsFor(it.next(), Duration.ofSeconds(10L));
            }
        } catch (TimeoutException e) {
            log.log(Level.FINER, "timeout error", e);
            throw new SinetStreamIOException(e);
        } catch (AuthenticationException e2) {
            log.log(Level.FINER, "auth error", e2);
            throw new jp.ad.sinet.stream.api.AuthenticationException(e2);
        }
    }

    public String getTopic() {
        return String.join(",", this.topics);
    }

    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        log.fine(() -> {
            return "KAFKA consumer wakeup: " + getClientId();
        });
        this.consumer.wakeup();
        stopPollingWorker();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopPollingWorker() {
        this.lock.lock();
        try {
            if (Objects.isNull(this.worker) || this.worker.isShutdown()) {
                return;
            }
            this.worker.shutdown();
            try {
                this.worker.awaitTermination(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.log(Level.FINER, "stop polling", (Throwable) e);
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void setupOffsetsProducer() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.consumerProperties.getProperty("bootstrap.servers"));
        setupTransactionId(properties);
        setupSSLProperties(this.config, properties);
        setupSASLProperties(this.config, properties);
        this.sendOffsetsEnabled.set(true);
        this.producer = new KafkaProducer<>(properties, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
        this.producer.initTransactions();
    }

    public Object getMetrics() {
        return this.consumer.metrics();
    }

    public void resetMetrics() {
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }

    @Generated
    public String getGroupId() {
        return this.groupId;
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put("fetch.min.bytes", MessageUtils::toInteger);
        hashMap.put("fetch.max.bytes", MessageUtils::toInteger);
        hashMap.put("fetch.max.wait.ms", MessageUtils::toInteger);
        hashMap.put("max.partition.fetch.bytes", MessageUtils::toInteger);
        hashMap.put("enable.auto.commit", MessageUtils::toBoolean);
        hashMap.put("auto.commit.interval.ms", MessageUtils::toInteger);
        hashMap.put("check.crcs", MessageUtils::toBoolean);
        hashMap.put("max.poll.records", MessageUtils::toInteger);
        hashMap.put("max.poll.interval.ms", MessageUtils::toInteger);
        hashMap.put("session.timeout.ms", MessageUtils::toInteger);
        hashMap.put("heartbeat.interval.ms", MessageUtils::toInteger);
        hashMap.put("exclude.internal.topics", MessageUtils::toBoolean);
        hashMap.put("allow.auto.create.topics", MessageUtils::toBoolean);
        hashMap.put("auto.offset.reset", MessageUtils::toString);
        hashMap.put("default.api.timeout.ms", MessageUtils::toInteger);
        hashMap.put("group.instance.id", MessageUtils::toString);
        hashMap.put("isolation.level", MessageUtils::toString);
        hashMap.put("group.id", MessageUtils::toString);
        hashMap.put("partition.assignment.strategy", MessageUtils::toStringList);
        CONSUMER_PARAMETER_NAMES_MAP = Collections.unmodifiableMap(hashMap);
    }
}
