package io.deephaven.kafka.ingest;

import gnu.trove.map.hash.TIntObjectHashMap;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.hash.KeyedIntObjectHashMap;
import io.deephaven.hash.KeyedIntObjectKey;
import io.deephaven.io.logger.Logger;
import java.text.DecimalFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/kafka/ingest/KafkaIngester.class */
public class KafkaIngester {
    private final KafkaConsumer<?, ?> kafkaConsumer;

    @NotNull
    private final Logger log;
    private final String topic;
    private final String partitionDescription;
    private final TIntObjectHashMap<KafkaStreamConsumer> streamConsumers;
    private final KeyedIntObjectHashMap<TopicPartition> assignedPartitions;
    private final String logPrefix;
    private long messagesProcessed;
    private long messagesWithErr;
    private long lastProcessed;
    private volatile boolean needsAssignment;
    private volatile boolean done;
    private static final int REPORT_INTERVAL_MS = Configuration.getInstance().getIntegerForClassWithDefault(KafkaIngester.class, "reportIntervalMs", 60000);
    private static final long MAX_ERRS = Configuration.getInstance().getLongForClassWithDefault(KafkaIngester.class, "maxErrs", 500);
    public static final IntPredicate ALL_PARTITIONS = new IntPredicate() { // from class: io.deephaven.kafka.ingest.KafkaIngester.2
        @Override // java.util.function.IntPredicate
        public boolean test(int i) {
            return true;
        }

        public String toString() {
            return "ALL";
        }
    };
    public static long SEEK_TO_BEGINNING = -1;
    public static long DONT_SEEK = -2;
    public static long SEEK_TO_END = -3;
    public static IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING = i -> {
        return SEEK_TO_BEGINNING;
    };
    public static IntToLongFunction ALL_PARTITIONS_DONT_SEEK = i -> {
        return DONT_SEEK;
    };
    public static IntToLongFunction ALL_PARTITIONS_SEEK_TO_END = i -> {
        return SEEK_TO_END;
    };

    /* loaded from: input_file:io/deephaven/kafka/ingest/KafkaIngester$PartitionRange.class */
    public static class PartitionRange implements IntPredicate {
        final int startInclusive;
        final int endInclusive;

        public PartitionRange(int i, int i2) {
            this.startInclusive = i;
            this.endInclusive = Require.geq(i2, "endInclusive", i, "startInclusive");
        }

        @Override // java.util.function.IntPredicate
        public boolean test(int i) {
            return i >= this.startInclusive && i <= this.endInclusive;
        }

        public String toString() {
            return Integer.toString(this.startInclusive) + (this.startInclusive == this.endInclusive ? "" : Integer.valueOf(this.endInclusive));
        }
    }

    /* loaded from: input_file:io/deephaven/kafka/ingest/KafkaIngester$PartitionRoundRobin.class */
    public static class PartitionRoundRobin implements IntPredicate {
        final int consumerIndex;
        final int consumerCount;

        public PartitionRoundRobin(int i, int i2) {
            this.consumerIndex = Require.geqZero(Require.lt(i, "consumerIndex", i2, "consumerCount"), "consumerIndex");
            this.consumerCount = i2;
        }

        @Override // java.util.function.IntPredicate
        public boolean test(int i) {
            return i % this.consumerCount == this.consumerIndex;
        }

        public String toString() {
            return "N % " + this.consumerCount + " == " + this.consumerIndex;
        }
    }

    /* loaded from: input_file:io/deephaven/kafka/ingest/KafkaIngester$SinglePartition.class */
    public static class SinglePartition extends PartitionRange {
        public SinglePartition(int i) {
            super(i, i);
        }
    }

    public KafkaIngester(Logger logger, Properties properties, String str, IntFunction<KafkaStreamConsumer> intFunction, IntToLongFunction intToLongFunction) {
        this(logger, properties, str, ALL_PARTITIONS, intFunction, intToLongFunction);
    }

    public KafkaIngester(@NotNull Logger logger, Properties properties, String str, IntPredicate intPredicate, IntFunction<KafkaStreamConsumer> intFunction, IntToLongFunction intToLongFunction) {
        this.streamConsumers = new TIntObjectHashMap<>();
        this.assignedPartitions = new KeyedIntObjectHashMap<>(new KeyedIntObjectKey.BasicStrict<TopicPartition>() { // from class: io.deephaven.kafka.ingest.KafkaIngester.1
            public int getIntKey(@NotNull TopicPartition topicPartition) {
                return topicPartition.partition();
            }
        });
        this.messagesProcessed = 0L;
        this.messagesWithErr = 0L;
        this.lastProcessed = 0L;
        this.log = logger;
        this.topic = str;
        this.partitionDescription = intPredicate.toString();
        this.logPrefix = KafkaIngester.class.getSimpleName() + "(" + str + ", " + this.partitionDescription + "): ";
        this.kafkaConsumer = new KafkaConsumer<>(properties);
        this.kafkaConsumer.partitionsFor(str).stream().filter(partitionInfo -> {
            return intPredicate.test(partitionInfo.partition());
        }).map(partitionInfo2 -> {
            return new TopicPartition(str, partitionInfo2.partition());
        }).forEach(topicPartition -> {
            this.assignedPartitions.add(topicPartition);
            this.streamConsumers.put(topicPartition.partition(), (KafkaStreamConsumer) intFunction.apply(topicPartition.partition()));
        });
        assign();
        Iterator it = this.assignedPartitions.iterator();
        while (it.hasNext()) {
            TopicPartition topicPartition2 = (TopicPartition) it.next();
            long applyAsLong = intToLongFunction.applyAsLong(topicPartition2.partition());
            if (applyAsLong == SEEK_TO_BEGINNING) {
                logger.info().append(this.logPrefix).append(topicPartition2.toString()).append(" seeking to beginning.").append(applyAsLong).endl();
                this.kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition2));
            } else if (applyAsLong == SEEK_TO_END) {
                logger.info().append(this.logPrefix).append(topicPartition2.toString()).append(" seeking to end.").append(applyAsLong).endl();
                this.kafkaConsumer.seekToEnd(Collections.singletonList(topicPartition2));
            } else if (applyAsLong != DONT_SEEK) {
                logger.info().append(this.logPrefix).append(topicPartition2.toString()).append(" seeking to offset ").append(applyAsLong).append(".").endl();
                this.kafkaConsumer.seek(topicPartition2, applyAsLong);
            }
        }
    }

    private void assign() {
        synchronized (this.assignedPartitions) {
            this.kafkaConsumer.assign(this.assignedPartitions.values());
            this.log.info().append(this.logPrefix).append("Partition Assignments: ").append(this.assignedPartitions.values().toString()).endl();
        }
    }

    public String toString() {
        return KafkaIngester.class.getSimpleName() + this.topic + ":" + this.partitionDescription;
    }

    public void start() {
        Thread thread = new Thread(this::consumerLoop, toString());
        thread.setDaemon(true);
        thread.start();
    }

    private static double msgPerSec(long j, long j2) {
        return (1.0E9d * j) / j2;
    }

    private void consumerLoop() {
        long j = REPORT_INTERVAL_MS * 1000000;
        long nanoTime = System.nanoTime();
        DecimalFormat decimalFormat = new DecimalFormat("#.0000");
        while (true) {
            if (this.done) {
                break;
            }
            while (this.needsAssignment) {
                this.needsAssignment = false;
                assign();
            }
            long nanoTime2 = System.nanoTime();
            long j2 = nanoTime + j;
            if (!pollOnce(Duration.ofNanos(nanoTime2 > j2 ? 0L : j2 - nanoTime2))) {
                this.log.error().append(this.logPrefix).append("Stopping due to errors (").append(this.messagesWithErr).append(" messages with error out of ").append(this.messagesProcessed).append(" messages processed)").endl();
                break;
            }
            long nanoTime3 = System.nanoTime();
            if (nanoTime3 > j2) {
                long j3 = this.messagesProcessed - this.lastProcessed;
                long j4 = nanoTime3 - nanoTime;
                this.log.info().append(this.logPrefix).append("Processed ").append(j3).append(" in ").append(j4 / 1000000).append("ms, ").append(decimalFormat.format(msgPerSec(j3, j4))).append(" msgs/sec").endl();
                nanoTime = nanoTime3;
                this.lastProcessed = this.messagesProcessed;
            }
        }
        this.log.info().append(this.logPrefix).append("Closing Kafka consumer").endl();
        this.kafkaConsumer.close();
    }

    private boolean pollOnce(Duration duration) {
        KafkaStreamConsumer kafkaStreamConsumer;
        try {
            ConsumerRecords poll = this.kafkaConsumer.poll(duration);
            for (TopicPartition topicPartition : poll.partitions()) {
                int partition = topicPartition.partition();
                synchronized (this.streamConsumers) {
                    kafkaStreamConsumer = (KafkaStreamConsumer) this.streamConsumers.get(partition);
                }
                if (kafkaStreamConsumer != null) {
                    List records = poll.records(topicPartition);
                    if (records.isEmpty()) {
                        continue;
                    } else {
                        try {
                            kafkaStreamConsumer.accept(records);
                            this.messagesProcessed += records.size();
                        } catch (Throwable th) {
                            this.messagesWithErr++;
                            this.log.error().append(this.logPrefix).append("Exception while processing Kafka message:").append(th).endl();
                            if (this.messagesWithErr > MAX_ERRS) {
                                kafkaStreamConsumer.acceptFailure(th);
                                this.log.error().append(this.logPrefix).append("Max number of errors exceeded, aborting " + this + " consumer thread.").endl();
                                return false;
                            }
                        }
                    }
                }
            }
            return true;
        } catch (Exception e) {
            this.log.error().append(this.logPrefix).append("Exception while polling for Kafka messages:").append(e).append(", aborting.").endl();
            return false;
        } catch (WakeupException e2) {
            return true;
        }
    }

    public void shutdown() {
        if (this.done) {
            return;
        }
        synchronized (this.streamConsumers) {
            this.streamConsumers.clear();
        }
        this.done = true;
        this.kafkaConsumer.wakeup();
    }

    public void shutdownPartition(int i) {
        if (this.done) {
            return;
        }
        synchronized (this.streamConsumers) {
            if (this.streamConsumers.remove(i) == null) {
                return;
            }
            boolean isEmpty = this.streamConsumers.isEmpty();
            this.assignedPartitions.remove(i);
            if (isEmpty) {
                this.done = true;
            } else {
                this.needsAssignment = true;
            }
            this.kafkaConsumer.wakeup();
        }
    }
}
