package org.darkphoenixs.kafka.core;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.darkphoenixs.kafka.pool.KafkaMessageReceiverPool;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.util.RefleTool;

/* loaded from: input_file:org/darkphoenixs/kafka/core/KafkaMessageReceiverImpl.class */
public class KafkaMessageReceiverImpl<K, V> implements KafkaMessageReceiver<K, V> {
    private final AtomicReference<SimpleConsumer> consumer = new AtomicReference<>();
    protected Map<String, Integer> replicaBrokers = new LinkedHashMap();
    protected PartitionMetadata metadata;
    protected FetchResponse fetchResponse;
    private KafkaMessageReceiverPool<K, V> pool;
    private VerifiableProperties props;

    public KafkaMessageReceiverImpl(Properties properties, KafkaMessageReceiverPool<K, V> kafkaMessageReceiverPool) {
        this.pool = kafkaMessageReceiverPool;
        this.props = new VerifiableProperties(properties);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized List<V> receive(String str, int i, long j, long j2) {
        if (j2 <= 0) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        int i2 = 0;
        while (true) {
            if (i2 >= 3) {
                break;
            }
            if (checkLeader(str, i, j)) {
                z = true;
                break;
            }
            i2++;
        }
        if (!z) {
            return arrayList;
        }
        Iterator it = this.fetchResponse.messageSet(str, i).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            if (messageAndOffset.offset() > (j + j2) - 1) {
                break;
            }
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bArr = new byte[payload.limit()];
            payload.get(bArr);
            arrayList.add(((Decoder) RefleTool.newInstance(this.pool.getValDecoderClass(), this.props)).fromBytes(bArr));
        }
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized Map<K, V> receiveWithKey(String str, int i, long j, long j2) {
        if (j2 <= 0) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        boolean z = false;
        int i2 = 0;
        while (true) {
            if (i2 >= 3) {
                break;
            }
            if (checkLeader(str, i, j)) {
                z = true;
                break;
            }
            i2++;
        }
        if (!z) {
            return linkedHashMap;
        }
        Iterator it = this.fetchResponse.messageSet(str, i).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            if (messageAndOffset.offset() > (j + j2) - 1) {
                break;
            }
            ByteBuffer key = messageAndOffset.message().key();
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bArr = new byte[key.limit()];
            byte[] bArr2 = new byte[payload.limit()];
            key.get(bArr);
            payload.get(bArr2);
            linkedHashMap.put(((Decoder) RefleTool.newInstance(this.pool.getKeyDecoderClass(), this.props)).fromBytes(bArr), ((Decoder) RefleTool.newInstance(this.pool.getValDecoderClass(), this.props)).fromBytes(bArr2));
        }
        return linkedHashMap;
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized long getLatestOffset(String str, int i) {
        if (!checkConsumer(str, i)) {
            return -1L;
        }
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
        OffsetResponse offsetsBefore = this.consumer.get().getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), this.pool.getClientId()));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        logger.error("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return 0L;
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized long getEarliestOffset(String str, int i) {
        if (!checkConsumer(str, i)) {
            return -1L;
        }
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
        OffsetResponse offsetsBefore = this.consumer.get().getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), this.pool.getClientId()));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        logger.error("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return 0L;
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public int getPartitionCount(String str) {
        return getPartitionNum(str);
    }

    @Override // org.darkphoenixs.kafka.core.KafkaMessageReceiver
    public synchronized void shutDown() {
        if (this.consumer.get() != null) {
            this.consumer.get().close();
            this.consumer.set(null);
        }
    }

    private PartitionMetadata findNewLeader(String str, String str2, int i) throws MQException {
        boolean z;
        for (int i2 = 0; i2 < 3; i2++) {
            PartitionMetadata findLeader = findLeader(this.replicaBrokers, str2, i);
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!str.equalsIgnoreCase(findLeader.leader().host()) || i2 != 0) {
                    return findLeader;
                }
                z = true;
            }
            if (z) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        logger.error("Unable to find new leader after Broker failure. Exiting");
        throw new MQException("Unable to find new leader after Broker failure. Exiting");
    }

    /* JADX WARN: Finally extract failed */
    private PartitionMetadata findLeader(Map<String, Integer> map, String str, int i) {
        PartitionMetadata partitionMetadata = null;
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(entry.getKey(), entry.getValue().intValue(), 100000, KafkaConstants.BUFFER_SIZE, "leaderLookup");
                    Iterator it = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((TopicMetadata) it.next()).partitionsMetadata().iterator();
                        while (true) {
                            if (it2.hasNext()) {
                                PartitionMetadata partitionMetadata2 = (PartitionMetadata) it2.next();
                                if (partitionMetadata2.partitionId() == i) {
                                    partitionMetadata = partitionMetadata2;
                                    break;
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    logger.error("Error communicating with Broker [" + entry.getKey() + "] to find Leader for [" + str + ", " + i + "] Reason: " + e);
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            for (BrokerEndPoint brokerEndPoint : partitionMetadata.replicas()) {
                this.replicaBrokers.put(brokerEndPoint.host(), Integer.valueOf(brokerEndPoint.port()));
            }
        }
        return partitionMetadata;
    }

    private boolean checkLeader(String str, int i, long j) {
        if (!checkConsumer(str, i)) {
            return false;
        }
        this.fetchResponse = this.consumer.get().fetch(new FetchRequestBuilder().clientId(this.pool.getClientId()).addFetch(str, i, j, 100000).build());
        String host = this.metadata.leader().host();
        if (!this.fetchResponse.hasError()) {
            return true;
        }
        short errorCode = this.fetchResponse.errorCode(str, i);
        logger.error("Error fetching data from the Broker:" + host + " Reason: " + ((int) errorCode));
        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
            getLatestOffset(str, i);
        }
        this.consumer.get().close();
        this.consumer.set(null);
        try {
            this.metadata = findNewLeader(host, str, i);
            return false;
        } catch (MQException e) {
            logger.error("Find new leader failed.", e);
            return false;
        }
    }

    private boolean checkConsumer(String str, int i) {
        if (this.consumer.get() != null) {
            return true;
        }
        if (this.metadata == null) {
            this.replicaBrokers.clear();
            for (String str2 : getBrokerStr(str).split(",")) {
                String[] split = str2.split(":");
                this.replicaBrokers.put(split[0], Integer.valueOf(split[1]));
            }
            this.metadata = findLeader(this.replicaBrokers, str, i);
        }
        if (this.metadata == null) {
            logger.error("Can't find metadata for Topic and Partition. Exiting");
            return false;
        }
        if (this.metadata.leader() == null) {
            logger.error("Can't find Leader for Topic and Partition. Exiting");
            return false;
        }
        this.consumer.set(new SimpleConsumer(this.metadata.leader().host(), Integer.valueOf(this.metadata.leader().port()).intValue(), 100000, KafkaConstants.BUFFER_SIZE, this.pool.getClientId()));
        return true;
    }

    private String getBrokerStr(String str) {
        ZookeeperBrokers zookeeperBrokers = new ZookeeperBrokers(new ZookeeperHosts(this.pool.getZookeeperStr(), str));
        String brokerInfo = zookeeperBrokers.getBrokerInfo();
        zookeeperBrokers.close();
        return brokerInfo;
    }

    private int getPartitionNum(String str) {
        ZookeeperBrokers zookeeperBrokers = new ZookeeperBrokers(new ZookeeperHosts(this.pool.getZookeeperStr(), str));
        int numPartitions = zookeeperBrokers.getNumPartitions();
        zookeeperBrokers.close();
        return numPartitions;
    }
}
