package org.joyqueue.client.internal.consumer.support;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.ConsumerIndexManager;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.consumer.domain.FetchIndexData;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.exception.ClientException;
import org.joyqueue.client.internal.metadata.domain.PartitionMetadata;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.command.CommitAckData;
import org.joyqueue.network.domain.BrokerNode;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/DefaultConsumerIndexManager.class */
public class DefaultConsumerIndexManager extends Service implements ConsumerIndexManager {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultConsumerIndexManager.class);
    private ClusterManager clusterManager;
    private ConsumerClientManager consumerClientManager;

    public DefaultConsumerIndexManager(ClusterManager clusterManager, ConsumerClientManager consumerClientManager) {
        this.clusterManager = clusterManager;
        this.consumerClientManager = consumerClientManager;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode resetIndex(String str, String str2, short s, long j) {
        return JoyQueueCode.SUCCESS;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public FetchIndexData fetchIndex(String str, String str2, short s, long j) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(str, Lists.newArrayList(new Short[]{Short.valueOf(s)}));
        return (FetchIndexData) batchFetchIndex(newHashMap, str2, j).get(str, Short.valueOf(s));
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode commitReply(String str, List<ConsumeReply> list, String str2, long j) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(str, list);
        return batchCommitReply(newHashMap, str2, j).get(str);
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode commitIndex(String str, String str2, short s, long j, long j2) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(Short.valueOf(s), Long.valueOf(j));
        return batchCommitIndex(str, str2, newHashMap, j2).get(Short.valueOf(s));
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Table<String, Short, FetchIndexData> batchFetchIndex(Map<String, List<Short>> map, String str, long j) {
        HashBasedTable create = HashBasedTable.create();
        if (MapUtils.isEmpty(map)) {
            return create;
        }
        for (Map.Entry<BrokerNode, Map<String, List<Short>>> entry : buildFetchIndexParams(map, str).entrySet()) {
            try {
                for (Map.Entry entry2 : this.consumerClientManager.getOrCreateClient(entry.getKey()).fetchIndex(entry.getValue(), str, j).getData().rowMap().entrySet()) {
                    for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                        org.joyqueue.network.command.FetchIndexData fetchIndexData = (org.joyqueue.network.command.FetchIndexData) entry3.getValue();
                        create.put(entry2.getKey(), entry3.getKey(), new FetchIndexData(fetchIndexData.getIndex(), fetchIndexData.getLeftIndex(), fetchIndexData.getRightIndex(), fetchIndexData.getCode()));
                    }
                }
            } catch (ClientException e) {
                logger.error("fetchIndex exception, fetchMap: {}, app: {}", new Object[]{entry.getValue(), str, e});
                for (Map.Entry<String, List<Short>> entry4 : entry.getValue().entrySet()) {
                    Iterator<Short> it = entry4.getValue().iterator();
                    while (it.hasNext()) {
                        create.put(entry4.getKey(), it.next(), new FetchIndexData(JoyQueueCode.valueOf(e.getCode())));
                    }
                }
            }
        }
        for (Map.Entry<String, List<Short>> entry5 : map.entrySet()) {
            for (Short sh : entry5.getValue()) {
                if (!create.contains(entry5.getKey(), sh)) {
                    create.put(entry5.getKey(), sh, new FetchIndexData(JoyQueueCode.CN_UNKNOWN_ERROR));
                }
            }
        }
        return create;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Map<String, JoyQueueCode> batchCommitReply(Map<String, List<ConsumeReply>> map, String str, long j) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<BrokerNode, Table<String, Short, List<CommitAckData>>> entry : buildCommitAckParams(map, str).entrySet()) {
            try {
                for (Map.Entry entry2 : this.consumerClientManager.getOrCreateClient(entry.getKey()).commitAck(entry.getValue(), str, j).getResult().rowMap().entrySet()) {
                    Iterator it = ((Map) entry2.getValue()).entrySet().iterator();
                    while (it.hasNext()) {
                        newHashMap.put(entry2.getKey(), ((Map.Entry) it.next()).getValue());
                    }
                }
            } catch (ClientException e) {
                logger.error("commit ack exception, commitMap: {}, app: {}", new Object[]{entry.getValue(), str, e});
                Iterator it2 = entry.getValue().rowMap().entrySet().iterator();
                while (it2.hasNext()) {
                    newHashMap.put(((Map.Entry) it2.next()).getKey(), JoyQueueCode.valueOf(e.getCode()));
                }
            }
        }
        for (Map.Entry<String, List<ConsumeReply>> entry3 : map.entrySet()) {
            if (!newHashMap.containsKey(entry3.getKey())) {
                newHashMap.put(entry3.getKey(), JoyQueueCode.CN_UNKNOWN_ERROR);
            }
        }
        return newHashMap;
    }

    protected Map<BrokerNode, Map<String, List<Short>>> buildFetchIndexParams(Map<String, List<Short>> map, String str) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<Short>> entry : map.entrySet()) {
            String key = entry.getKey();
            TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(key, str);
            if (fetchTopicMetadata == null) {
                logger.warn("topic {} metadata is null", key);
            } else {
                for (Short sh : entry.getValue()) {
                    PartitionMetadata partition = fetchTopicMetadata.getPartition(sh.shortValue());
                    if (partition == null) {
                        partition = fetchTopicMetadata.getPartitions().get(0);
                    }
                    BrokerNode leader = partition.getLeader();
                    if (leader == null) {
                        logger.warn("topic {}, partition {}, leader is null", key, sh);
                    } else {
                        Map map2 = (Map) newHashMap.get(leader);
                        if (map2 == null) {
                            map2 = Maps.newHashMap();
                            newHashMap.put(leader, map2);
                        }
                        List list = (List) map2.get(key);
                        if (list == null) {
                            list = Lists.newLinkedList();
                            map2.put(key, list);
                        }
                        list.add(sh);
                    }
                }
            }
        }
        return newHashMap;
    }

    protected Map<BrokerNode, Table<String, Short, List<CommitAckData>>> buildCommitAckParams(Map<String, List<ConsumeReply>> map, String str) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<ConsumeReply>> entry : map.entrySet()) {
            String key = entry.getKey();
            TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(key, str);
            if (fetchTopicMetadata == null) {
                logger.warn("topic {} metadata is null", key);
            } else {
                for (ConsumeReply consumeReply : entry.getValue()) {
                    PartitionMetadata partition = fetchTopicMetadata.getPartition(consumeReply.getPartition());
                    if (partition == null) {
                        partition = fetchTopicMetadata.getPartitions().get(0);
                    }
                    BrokerNode leader = partition.getLeader();
                    if (leader == null) {
                        logger.warn("topic {}, partition {}, leader is null", key, Short.valueOf(consumeReply.getPartition()));
                    } else {
                        HashBasedTable hashBasedTable = (Table) newHashMap.get(leader);
                        if (hashBasedTable == null) {
                            hashBasedTable = HashBasedTable.create();
                            newHashMap.put(leader, hashBasedTable);
                        }
                        List list = (List) hashBasedTable.get(key, Short.valueOf(consumeReply.getPartition()));
                        if (list == null) {
                            list = Lists.newLinkedList();
                            hashBasedTable.put(key, Short.valueOf(consumeReply.getPartition()), list);
                        }
                        list.add(new CommitAckData(consumeReply.getPartition(), consumeReply.getIndex(), consumeReply.getRetryType()));
                    }
                }
            }
        }
        return newHashMap;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Map<Short, JoyQueueCode> batchCommitIndex(String str, String str2, Map<Short, Long> map, long j) {
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry<Short, Long> entry : map.entrySet()) {
            create.put(str, entry.getKey(), entry.getValue());
        }
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<BrokerNode, Table<String, Short, Long>> entry2 : buildCommitIndexRequest(str, str2, map).entrySet()) {
            try {
                Iterator it = this.consumerClientManager.getOrCreateClient(entry2.getKey()).commitIndex(entry2.getValue(), str2, j).getResult().rowMap().entrySet().iterator();
                while (it.hasNext()) {
                    for (Map.Entry entry3 : ((Map) ((Map.Entry) it.next()).getValue()).entrySet()) {
                        newHashMap.put(entry3.getKey(), entry3.getValue());
                    }
                }
            } catch (ClientException e) {
                logger.error("commit index exception, commitMap: {}, app: {}", new Object[]{entry2.getValue(), str2, e});
                Iterator it2 = entry2.getValue().rowMap().entrySet().iterator();
                while (it2.hasNext()) {
                    Iterator it3 = ((Map) ((Map.Entry) it2.next()).getValue()).entrySet().iterator();
                    while (it3.hasNext()) {
                        newHashMap.put(((Map.Entry) it3.next()).getKey(), JoyQueueCode.valueOf(e.getCode()));
                    }
                }
            }
        }
        for (Map.Entry entry4 : newHashMap.entrySet()) {
            if (!newHashMap.containsKey(entry4.getKey())) {
                newHashMap.put(entry4.getKey(), JoyQueueCode.CN_UNKNOWN_ERROR);
            }
        }
        return newHashMap;
    }

    protected Map<BrokerNode, Table<String, Short, Long>> buildCommitIndexRequest(String str, String str2, Map<Short, Long> map) {
        HashMap newHashMap = Maps.newHashMap();
        TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(str, str2);
        if (fetchTopicMetadata == null) {
            logger.warn("topic {} metadata is null", str);
            return null;
        }
        for (Map.Entry<Short, Long> entry : map.entrySet()) {
            short shortValue = entry.getKey().shortValue();
            long longValue = entry.getValue().longValue();
            PartitionMetadata partition = fetchTopicMetadata.getPartition(shortValue);
            if (partition != null && partition.getLeader() != null) {
                HashBasedTable hashBasedTable = (Table) newHashMap.get(partition.getLeader());
                if (hashBasedTable == null) {
                    hashBasedTable = HashBasedTable.create();
                    newHashMap.put(partition.getLeader(), hashBasedTable);
                }
                hashBasedTable.put(str, Short.valueOf(shortValue), Long.valueOf(longValue));
            }
        }
        return newHashMap;
    }
}
