package org.darkphoenixs.kafka.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:org/darkphoenixs/kafka/core/KafkaMessageNewReceiver.class */
public class KafkaMessageNewReceiver<K, V> implements KafkaMessageReceiver<K, V> {
    protected final KafkaConsumer<K, V> kafkaConsumer;

    public KafkaMessageNewReceiver(Properties properties) {
        this.kafkaConsumer = new KafkaConsumer<>(properties);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized List<V> receive(String str, int i, long j, long j2) {
        if (j2 <= 0) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        long earliestOffset = getEarliestOffset(str, i);
        if (j < earliestOffset) {
            j = earliestOffset;
        }
        long latestOffset = getLatestOffset(str, i);
        if (j + j2 > latestOffset) {
            j2 = latestOffset - j;
        }
        ArrayList arrayList = new ArrayList();
        this.kafkaConsumer.assign(Arrays.asList(new TopicPartition(str, i)));
        this.kafkaConsumer.seek(new TopicPartition(str, i), j);
        boolean z = true;
        while (z) {
            Iterator it = this.kafkaConsumer.poll(0L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                long offset = consumerRecord.offset();
                if (offset == latestOffset - 1 || offset > (j + j2) - 1) {
                    z = false;
                    break;
                }
                arrayList.add(consumerRecord.value());
            }
        }
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized Map<K, V> receiveWithKey(String str, int i, long j, long j2) {
        if (j2 <= 0) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        long earliestOffset = getEarliestOffset(str, i);
        if (j < earliestOffset) {
            j = earliestOffset;
        }
        long latestOffset = getLatestOffset(str, i);
        if (j + j2 > latestOffset) {
            j2 = latestOffset - j;
        }
        HashMap hashMap = new HashMap();
        this.kafkaConsumer.assign(Arrays.asList(new TopicPartition(str, i)));
        this.kafkaConsumer.seek(new TopicPartition(str, i), j);
        boolean z = true;
        while (z) {
            Iterator it = this.kafkaConsumer.poll(0L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                long offset = consumerRecord.offset();
                if (offset == latestOffset - 1 || offset > (j + j2) - 1) {
                    z = false;
                    break;
                }
                hashMap.put(consumerRecord.key(), consumerRecord.value());
            }
        }
        return hashMap;
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized long getLatestOffset(String str, int i) {
        this.kafkaConsumer.assign(Arrays.asList(new TopicPartition(str, i)));
        this.kafkaConsumer.seekToEnd(new TopicPartition[]{new TopicPartition(str, i)});
        return this.kafkaConsumer.position(new TopicPartition(str, i));
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized long getEarliestOffset(String str, int i) {
        this.kafkaConsumer.assign(Arrays.asList(new TopicPartition(str, i)));
        this.kafkaConsumer.seekToBeginning(new TopicPartition[]{new TopicPartition(str, i)});
        return this.kafkaConsumer.position(new TopicPartition(str, i));
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public int getPartitionCount(String str) {
        return this.kafkaConsumer.partitionsFor(str).size();
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized void shutDown() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }
}
