package io.bigdime.libs.kafka.consumers;

import com.google.common.base.Preconditions;
import io.bigdime.libs.kafka.constants.KafkaChannelConstants;
import io.bigdime.libs.kafka.exceptions.KafkaReaderException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/bigdime/libs/kafka/consumers/KafkaSimpleConsumer.class */
public class KafkaSimpleConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);
    private SimpleConsumer consumer;
    private List<String> brokers;
    private String leadBroker;
    private Path file;
    private List<String> m_replicaBrokers = new ArrayList();
    private Properties properties = new Properties();
    private String basePath = KafkaChannelConstants.DEFAULT_PATH;
    private String pathExtension = KafkaChannelConstants.CHANNEL_EXTENSION;
    private boolean isStarted = false;
    private int messageSize = Integer.valueOf(KafkaChannelConstants.DEFAULT_MESSAGE_SIZE).intValue();

    protected KafkaSimpleConsumer() {
    }

    public static KafkaSimpleConsumer getInstance() {
        return new KafkaSimpleConsumer();
    }

    public KafkaSimpleConsumer brokers(List<String> list) {
        this.brokers = list;
        return this;
    }

    public KafkaSimpleConsumer topic(String str) {
        this.properties.put(KafkaChannelConstants.TOPIC_ID, str);
        return this;
    }

    public KafkaSimpleConsumer partitionId(String str) {
        this.properties.put(KafkaChannelConstants.PARTITION_ID, str);
        return this;
    }

    public KafkaSimpleConsumer clientId(String str) {
        this.properties.put(KafkaChannelConstants.CLIENT_ID, str);
        return this;
    }

    public KafkaSimpleConsumer offSetDataDir(String str) {
        this.properties.put(KafkaChannelConstants.OFFSET_DATA_DIR, str);
        return this;
    }

    public KafkaSimpleConsumer messageSize(String str) {
        this.properties.put(KafkaChannelConstants.MESSAGE_SIZE, str);
        return this;
    }

    public KafkaSimpleConsumer properties(Properties properties) {
        this.properties.putAll(properties);
        return this;
    }

    public KafkaSimpleConsumer build() {
        Preconditions.checkNotNull(this.brokers);
        Preconditions.checkNotNull(this.properties.getProperty(KafkaChannelConstants.TOPIC_ID));
        Preconditions.checkNotNull(this.properties.getProperty(KafkaChannelConstants.PARTITION_ID));
        if (this.properties.getProperty(KafkaChannelConstants.OFFSET_DATA_DIR) != null) {
            this.basePath = this.properties.getProperty(KafkaChannelConstants.OFFSET_DATA_DIR);
        }
        if (this.properties.getProperty(KafkaChannelConstants.MESSAGE_SIZE) != null) {
            this.messageSize = Integer.valueOf(this.properties.getProperty(KafkaChannelConstants.MESSAGE_SIZE)).intValue();
        }
        electLeader();
        return this;
    }

    public void electLeader() {
        PartitionMetadata findPartitionReplicaLeader = findPartitionReplicaLeader();
        if (findPartitionReplicaLeader == null || findPartitionReplicaLeader.leader() == null) {
            this.leadBroker = null;
            return;
        }
        this.leadBroker = findPartitionReplicaLeader.leader().host();
        this.consumer = new SimpleConsumer(findPartitionReplicaLeader.leader().host(), findPartitionReplicaLeader.leader().port(), KafkaChannelConstants.TIMEOUT.intValue(), KafkaChannelConstants.BUFFER_SIZE.intValue(), this.properties.getProperty(KafkaChannelConstants.CLIENT_ID));
        logger.info("KDA Provider cluster - topic:partion [{},{}] lead broker: {}, replicas: {}", new Object[]{this.properties.get(KafkaChannelConstants.TOPIC_ID), Integer.valueOf(findPartitionReplicaLeader.partitionId()), findPartitionReplicaLeader.leader(), findPartitionReplicaLeader.isr().toString()});
    }

    @Deprecated
    public void start() throws IOException, KafkaReaderException {
        init();
        this.isStarted = true;
        logger.info("Client id: {} initialization complete - start state is: {} currentOffset is: {} ", this.properties.getProperty("client.id"), Boolean.valueOf(this.isStarted));
    }

    public void stop() {
        logger.info("Consumer : {} shutting down.", this.properties.get(KafkaChannelConstants.CLIENT_ID));
        this.consumer.close();
    }

    public List<KafkaMessage> pollData(long j) throws KafkaReaderException {
        String property = this.properties.getProperty("client.id");
        String property2 = this.properties.getProperty("topic.id");
        int intValue = Integer.valueOf(this.properties.getProperty("partition.id")).intValue();
        ArrayList arrayList = new ArrayList();
        FetchResponse fetch = this.consumer.fetch(new FetchRequestBuilder().clientId(property).addFetch(property2, intValue, j, this.messageSize).build());
        if (fetch.hasError()) {
            short errorCode = fetch.errorCode(property2, intValue);
            if (errorCode == Errors.LEADER_NOT_AVAILABLE.code() || errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()) {
                electLeader();
            }
            logger.error("Error fetching data from the Broker :\" clientId={} topicId={} partitionId={} currentOffset={} leadBroker={} fetchResponseCode={}", new Object[]{property, property2, Integer.valueOf(intValue), Long.valueOf(j), this.leadBroker, Errors.forCode(errorCode).name()});
            throw new KafkaReaderException("Error fetching data from the Broker : leadBroker " + this.leadBroker + "error = " + Errors.forCode(errorCode).name());
        }
        Iterator it = fetch.messageSet(property2, intValue).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            j = messageAndOffset.offset();
            logger.debug(property + "." + property2 + ":" + String.valueOf(intValue) + ".Read offset {} ", Long.valueOf(j));
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bArr = new byte[payload.limit()];
            payload.get(bArr);
            arrayList.add(KafkaMessage.getInstance(property2, intValue, j, bArr));
        }
        logger.debug(property + "." + property2 + ":" + String.valueOf(intValue) + ",Read offset {} ", Long.valueOf(j));
        return arrayList;
    }

    public long getEarliestOffSet() throws KafkaReaderException {
        String property = this.properties.getProperty(KafkaChannelConstants.CLIENT_ID);
        String property2 = this.properties.getProperty(KafkaChannelConstants.TOPIC_ID);
        int intValue = Integer.valueOf(this.properties.getProperty(KafkaChannelConstants.PARTITION_ID)).intValue();
        TopicAndPartition topicAndPartition = new TopicAndPartition(property2, intValue);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
        OffsetResponse offsetsBefore = this.consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), property));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(property2, intValue)[0];
        }
        short errorCode = offsetsBefore.errorCode(property2, intValue);
        logger.error("Error reading Offset from the Broker :\" clientId={} topicId={} partitionId={} leadBroker={} fetchResponseCode={}", new Object[]{property, property2, Integer.valueOf(intValue), this.leadBroker, Errors.forCode(errorCode).name()});
        throw new KafkaReaderException("Error reading Offset from the Broker : leadBroker " + this.leadBroker + "error = " + Errors.forCode(errorCode).name());
    }

    public long getLastOffset() throws KafkaReaderException {
        String property = this.properties.getProperty(KafkaChannelConstants.CLIENT_ID);
        String property2 = this.properties.getProperty(KafkaChannelConstants.TOPIC_ID);
        int intValue = Integer.valueOf(this.properties.getProperty(KafkaChannelConstants.PARTITION_ID)).intValue();
        logger.info("Consumer {} getting offset for :  {}:{}", new Object[]{property, this.consumer.host(), Integer.valueOf(this.consumer.port())});
        TopicAndPartition topicAndPartition = new TopicAndPartition(property2, intValue);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
        OffsetResponse offsetsBefore = this.consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), property));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(property2, intValue)[0];
        }
        short errorCode = offsetsBefore.errorCode(property2, intValue);
        logger.error("Error reading Offset from the Broker :\" clientId={} topicId={} partitionId={} currentOffset={} leadBroker={} fetchResponseCode={}", new Object[]{property, property2, Integer.valueOf(intValue), this.leadBroker, Errors.forCode(errorCode).name()});
        throw new KafkaReaderException("Error reading Offset from the Broker : leadBroker " + this.leadBroker + "error = " + Errors.forCode(errorCode).name());
    }

    @Deprecated
    private long init() throws IOException, KafkaReaderException {
        String str = this.basePath + "/" + this.properties.getProperty(KafkaChannelConstants.CLIENT_ID) + this.pathExtension;
        Path path = Paths.get(str, new String[0]);
        Path path2 = Paths.get(this.basePath, new String[0]);
        if (!Files.exists(path2, new LinkOption[0])) {
            FileUtils.forceMkdir(path2.toFile());
        }
        if (Files.exists(path, new LinkOption[0])) {
            this.file = FileSystems.getDefault().getPath(str, new String[0]);
        } else {
            this.file = Files.createFile(path, new FileAttribute[0]);
        }
        String str2 = new String(Files.readAllBytes(this.file));
        return (str2 == null || !str2.contains("currentOffset")) ? getLastOffset() : Long.valueOf(str2.split(":")[1].trim()).longValue();
    }

    public PartitionMetadata findPartitionReplicaLeader() {
        String valueOf = String.valueOf(this.properties.get(KafkaChannelConstants.TOPIC_ID));
        int intValue = Integer.valueOf((String) this.properties.get(KafkaChannelConstants.PARTITION_ID)).intValue();
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = this.brokers.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String[] split = it.next().split(":");
            ArrayList arrayList = new ArrayList();
            arrayList.add(split[0]);
            partitionMetadata = findLeader(arrayList, Integer.valueOf(split[1]).intValue(), valueOf, intValue);
            if (partitionMetadata != null) {
                logger.info("KDA Provider cluster - topic:partion [ {}, {}]lead broker: {}", new Object[]{valueOf, Integer.valueOf(partitionMetadata.partitionId()), partitionMetadata.leader()});
                logger.info("KDA Provider cluster - topic:partion [ {}, {}]replicas: {}", new Object[]{valueOf, Integer.valueOf(partitionMetadata.partitionId()), partitionMetadata.isr().toString()});
                break;
            }
        }
        return partitionMetadata;
    }

    private PartitionMetadata findLeader(List<String> list, int i, String str, int i2) {
        PartitionMetadata partitionMetadata = null;
        for (String str2 : list) {
            try {
                try {
                    this.consumer = new SimpleConsumer(str2, i, 100000, 65536, (String) this.properties.get(KafkaChannelConstants.CLIENT_ID));
                    Iterator it = this.consumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((TopicMetadata) it.next()).partitionsMetadata().iterator();
                        while (true) {
                            if (it2.hasNext()) {
                                PartitionMetadata partitionMetadata2 = (PartitionMetadata) it2.next();
                                if (partitionMetadata2.partitionId() == i2) {
                                    partitionMetadata = partitionMetadata2;
                                    break;
                                }
                            }
                        }
                    }
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                } catch (Exception e) {
                    logger.error("Error communicating with Broker [" + str2 + "] to find Leader for [" + str + ", " + i2 + "] Reason: " + e);
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                }
            } catch (Throwable th) {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                throw th;
            }
        }
        if (partitionMetadata != null) {
            this.m_replicaBrokers.clear();
            Iterator it3 = partitionMetadata.replicas().iterator();
            while (it3.hasNext()) {
                this.m_replicaBrokers.add(((Broker) it3.next()).host());
            }
        }
        return partitionMetadata;
    }

    public boolean isStarted() {
        return this.isStarted;
    }
}
