package org.joyqueue.client.internal.consumer.support;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.joyqueue.client.internal.consumer.ConsumerIndexManager;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.consumer.domain.FetchIndexData;
import org.joyqueue.client.internal.consumer.domain.LocalIndexData;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/LocalConsumerIndexManager.class */
public class LocalConsumerIndexManager extends Service implements ConsumerIndexManager {
    protected static final Logger logger = LoggerFactory.getLogger(LocalConsumerIndexManager.class);
    private ConsumerConfig config;
    private ConsumerIndexManager delegate;
    private ConsumerLocalIndexStore consumerIndexStore;

    public LocalConsumerIndexManager(ConsumerConfig consumerConfig, ConsumerIndexManager consumerIndexManager) {
        this.config = consumerConfig;
        this.delegate = consumerIndexManager;
    }

    protected void validate() throws Exception {
        this.consumerIndexStore = new ConsumerLocalIndexStore(this.config.getBroadcastLocalPath() + File.separator + this.config.getBroadcastGroup(), this.config.getBroadcastPersistInterval());
    }

    protected void doStart() throws Exception {
        this.consumerIndexStore.start();
    }

    protected void doStop() {
        if (this.consumerIndexStore != null) {
            this.consumerIndexStore.stop();
        }
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode resetIndex(String str, String str2, short s, long j) {
        FetchIndexData fetchIndex = this.delegate.fetchIndex(str, str2, s, j);
        if (!fetchIndex.getCode().equals(JoyQueueCode.SUCCESS)) {
            logger.error("resetIndex error, topic: {}, partition: {}, error: {}", new Object[]{str, Short.valueOf(s), fetchIndex.getCode()});
            return fetchIndex.getCode();
        }
        long j2 = 0;
        if (this.config.getBroadcastIndexAutoReset() == 2) {
            j2 = fetchIndex.getIndex();
        } else if (this.config.getBroadcastIndexAutoReset() == 0) {
            j2 = fetchIndex.getLeftIndex();
        } else if (this.config.getBroadcastIndexAutoReset() == 1) {
            j2 = fetchIndex.getRightIndex();
        }
        this.consumerIndexStore.saveIndex(str, str2, s, j2);
        return JoyQueueCode.SUCCESS;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public FetchIndexData fetchIndex(String str, String str2, short s, long j) {
        LocalIndexData fetchIndex = this.consumerIndexStore.fetchIndex(str, str2, s);
        if (fetchIndex != null && !isExpired(fetchIndex)) {
            return new FetchIndexData(fetchIndex.getIndex(), 0L, 0L, JoyQueueCode.SUCCESS);
        }
        FetchIndexData fetchIndex2 = this.delegate.fetchIndex(str, str2, s, j);
        if (!fetchIndex2.getCode().equals(JoyQueueCode.SUCCESS)) {
            logger.error("batchFetch index error, topic: {}, partition: {}, error: {}", new Object[]{str, Short.valueOf(s), fetchIndex2.getCode()});
            return new FetchIndexData(-1L, 0L, 0L, JoyQueueCode.SUCCESS);
        }
        long j2 = 0;
        if (this.config.getBroadcastIndexAutoReset() == 2) {
            j2 = fetchIndex2.getIndex();
        } else if (this.config.getBroadcastIndexAutoReset() == 0) {
            j2 = fetchIndex2.getLeftIndex();
        } else if (this.config.getBroadcastIndexAutoReset() == 1) {
            j2 = fetchIndex2.getRightIndex();
        }
        this.consumerIndexStore.saveIndex(str, str2, s, j2);
        return new FetchIndexData(j2, 0L, 0L, JoyQueueCode.SUCCESS);
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode commitReply(String str, List<ConsumeReply> list, String str2, long j) {
        for (ConsumeReply consumeReply : list) {
            this.consumerIndexStore.saveIndex(str, str2, consumeReply.getPartition(), consumeReply.getIndex() + 1);
        }
        return JoyQueueCode.SUCCESS;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public JoyQueueCode commitIndex(String str, String str2, short s, long j, long j2) {
        if (j == -1) {
            j = this.delegate.fetchIndex(str, str2, s, this.config.getTimeout()).getRightIndex();
        } else if (j == -2) {
            j = this.delegate.fetchIndex(str, str2, s, this.config.getTimeout()).getLeftIndex();
        }
        this.consumerIndexStore.saveIndex(str, str2, s, j);
        return JoyQueueCode.SUCCESS;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Table<String, Short, FetchIndexData> batchFetchIndex(Map<String, List<Short>> map, String str, long j) {
        HashBasedTable create = HashBasedTable.create();
        HashMap hashMap = null;
        for (Map.Entry<String, List<Short>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (Short sh : entry.getValue()) {
                FetchIndexData fetchIndex = this.delegate.fetchIndex(key, str, sh.shortValue(), j);
                LocalIndexData fetchIndex2 = this.consumerIndexStore.fetchIndex(key, str, sh.shortValue());
                if (fetchIndex2 == null || isExpired(fetchIndex2)) {
                    if (hashMap == null) {
                        hashMap = Maps.newHashMap();
                    }
                    List<Short> list = hashMap.get(key);
                    if (list == null) {
                        list = Lists.newLinkedList();
                        hashMap.put(key, list);
                    }
                    list.add(sh);
                } else {
                    create.put(key, sh, new FetchIndexData(fetchIndex2.getIndex(), fetchIndex.getLeftIndex(), fetchIndex.getRightIndex(), JoyQueueCode.SUCCESS));
                }
            }
        }
        if (MapUtils.isNotEmpty(hashMap)) {
            Table<String, Short, FetchIndexData> batchFetchIndex = this.delegate.batchFetchIndex(hashMap, str, j);
            for (Map.Entry<String, List<Short>> entry2 : hashMap.entrySet()) {
                String key2 = entry2.getKey();
                for (Short sh2 : entry2.getValue()) {
                    FetchIndexData fetchIndexData = (FetchIndexData) batchFetchIndex.get(key2, sh2);
                    if (fetchIndexData.getCode().equals(JoyQueueCode.SUCCESS)) {
                        this.consumerIndexStore.saveIndex(key2, str, sh2.shortValue(), fetchIndexData.getIndex());
                    } else {
                        logger.error("batchFetch index error, topic: {}, partition: {}, error: {}", new Object[]{key2, sh2, fetchIndexData.getCode()});
                    }
                    create.put(key2, sh2, fetchIndexData);
                }
            }
        }
        return create;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Map<String, JoyQueueCode> batchCommitReply(Map<String, List<ConsumeReply>> map, String str, long j) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<ConsumeReply>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (ConsumeReply consumeReply : entry.getValue()) {
                this.consumerIndexStore.saveIndex(key, str, consumeReply.getPartition(), consumeReply.getIndex());
            }
        }
        return newHashMap;
    }

    @Override // org.joyqueue.client.internal.consumer.ConsumerIndexManager
    public Map<Short, JoyQueueCode> batchCommitIndex(String str, String str2, Map<Short, Long> map, long j) {
        HashMap newHashMap = Maps.newHashMap();
        Map map2 = null;
        for (Map.Entry<Short, Long> entry : map.entrySet()) {
            if (entry.getValue().equals(-1L) || entry.getValue().equals(-2L)) {
                HashMap newHashMap2 = Maps.newHashMap();
                newHashMap2.put(str, Lists.newArrayList(map.keySet()));
                map2 = this.delegate.batchFetchIndex(newHashMap2, str2, j).row(str);
                break;
            }
        }
        for (Map.Entry<Short, Long> entry2 : map.entrySet()) {
            short shortValue = entry2.getKey().shortValue();
            long longValue = entry2.getValue().longValue();
            if (longValue != -1) {
                if (longValue == -2) {
                    if (map2 == null || map2.get(Short.valueOf(shortValue)) == null) {
                        newHashMap.put(entry2.getKey(), JoyQueueCode.CN_UNKNOWN_ERROR);
                    } else {
                        longValue = ((FetchIndexData) map2.get(Short.valueOf(shortValue))).getLeftIndex();
                    }
                }
                this.consumerIndexStore.saveIndex(str, str2, shortValue, longValue);
                newHashMap.put(entry2.getKey(), JoyQueueCode.SUCCESS);
            } else if (map2 == null || map2.get(Short.valueOf(shortValue)) == null) {
                newHashMap.put(entry2.getKey(), JoyQueueCode.CN_UNKNOWN_ERROR);
            } else {
                longValue = ((FetchIndexData) map2.get(Short.valueOf(shortValue))).getRightIndex();
                this.consumerIndexStore.saveIndex(str, str2, shortValue, longValue);
                newHashMap.put(entry2.getKey(), JoyQueueCode.SUCCESS);
            }
        }
        return newHashMap;
    }

    protected boolean isExpired(LocalIndexData localIndexData) {
        if (this.config.getBroadcastIndexExpireTime() == -1) {
            return false;
        }
        return localIndexData.isExpired(this.config.getBroadcastIndexExpireTime());
    }
}
