package edu.iu.dsc.tws.connectors.kafka;

import edu.iu.dsc.tws.api.checkpointing.Snapshot;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.checkpointing.task.CheckpointableTask;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:edu/iu/dsc/tws/connectors/kafka/KafkaSource.class */
public abstract class KafkaSource<K, V> implements CheckpointableTask, ISource {
    private static final Logger LOG = Logger.getLogger(KafkaSource.class.getName());
    private static final String LAST_PARTITION_OFFSETS = "LAST_PARTITION_OFFSETS";
    private KafkaConsumer<K, V> kafkaConsumer;
    protected Config cfg;
    protected TaskContext context;
    private HashMap<String, HashMap<Integer, Long>> partitionOffsets;

    public void prepare(Config config, TaskContext taskContext) {
        this.cfg = config;
        this.context = taskContext;
        Properties consumerProperties = getConsumerProperties();
        consumerProperties.put("group.id", getConsumerGroup(this.context));
        consumerProperties.put("enable.auto.commit", "false");
        this.kafkaConsumer = new KafkaConsumer<>(consumerProperties);
        this.kafkaConsumer.subscribe(getTopics());
        this.partitionOffsets = new HashMap<>();
    }

    private void assignAndSeek() {
        HashMap hashMap = new HashMap();
        this.partitionOffsets.forEach((str, hashMap2) -> {
            hashMap2.forEach((num, l) -> {
                hashMap.put(new TopicPartition(str, num.intValue()), Long.valueOf(l.longValue() + 1));
            });
        });
        if (!hashMap.isEmpty()) {
            this.kafkaConsumer.unsubscribe();
            Set<K> keySet = hashMap.keySet();
            LOG.log(Level.FINE, "Assigning to " + this.context.taskIndex() + ": " + keySet);
            this.kafkaConsumer.assign(keySet);
        }
        hashMap.forEach((topicPartition, l) -> {
            this.kafkaConsumer.seek(topicPartition, l.longValue());
        });
    }

    public String getConsumerGroup(TaskContext taskContext) {
        return taskContext.taskName();
    }

    public abstract Properties getConsumerProperties();

    public abstract Set<String> getTopics();

    public abstract void writeRecord(ConsumerRecord<K, V> consumerRecord);

    public abstract Duration getPollingTimeout();

    public KafkaConsumer<K, V> getKafkaConsumer() {
        return this.kafkaConsumer;
    }

    public void execute() {
        Iterator it = this.kafkaConsumer.poll(getPollingTimeout()).iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
            writeRecord(consumerRecord);
            LOG.log(Level.FINEST, String.format("[%s]  topic[%s] : partition[%d] : offset[%d] : value[%s]", Integer.valueOf(this.context.taskIndex()), consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.value()));
            this.partitionOffsets.computeIfAbsent(consumerRecord.topic(), str -> {
                return new HashMap();
            }).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
        }
    }

    public void restoreSnapshot(Snapshot snapshot) {
        this.partitionOffsets = (HashMap) snapshot.getOrDefault(LAST_PARTITION_OFFSETS, new HashMap());
        assignAndSeek();
    }

    public void takeSnapshot(Snapshot snapshot) {
        snapshot.setValue(LAST_PARTITION_OFFSETS, this.partitionOffsets);
    }

    public void onCheckpointPropagated(Snapshot snapshot) {
    }

    public void initSnapshot(Snapshot snapshot) {
    }
}
