package org.joyqueue.broker.index.handler;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.map.HashedMap;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.index.command.ConsumeIndexStoreRequest;
import org.joyqueue.broker.index.command.ConsumeIndexStoreResponse;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.codec.JoyQueueHeader;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Direction;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.network.transport.command.handler.CommandHandler;
import org.joyqueue.network.transport.exception.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/index/handler/ConsumeIndexStoreHandler.class */
public class ConsumeIndexStoreHandler implements CommandHandler, Type {
    private BrokerContext brokerContext;
    private Consume consume;
    private final Logger logger = LoggerFactory.getLogger(ConsumeIndexStoreHandler.class);
    private final Cache<String, Long> commitIndexCache = CacheBuilder.newBuilder().expireAfterAccess(60000, TimeUnit.MILLISECONDS).build();

    public ConsumeIndexStoreHandler(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
        this.consume = brokerContext.getConsume();
    }

    public Command handle(Transport transport, Command command) throws TransportException {
        ConsumeIndexStoreRequest consumeIndexStoreRequest = (ConsumeIndexStoreRequest) command.getPayload();
        if (consumeIndexStoreRequest == null) {
            return null;
        }
        Map<String, Map<Integer, IndexAndMetadata>> indexMetadata = consumeIndexStoreRequest.getIndexMetadata();
        this.logger.debug("ConsumeIndexStoreRequest info:[{}]", indexMetadata.toString());
        HashedMap hashedMap = new HashedMap();
        String app = consumeIndexStoreRequest.getApp();
        for (String str : indexMetadata.keySet()) {
            Map<Integer, IndexAndMetadata> map = indexMetadata.get(str);
            HashedMap hashedMap2 = new HashedMap();
            Iterator<Integer> it = map.keySet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                int code = JoyQueueCode.SUCCESS.getCode();
                IndexAndMetadata indexAndMetadata = map.get(Integer.valueOf(intValue));
                try {
                    setConsumeIndex(str, (short) intValue, app, indexAndMetadata.getIndex(), indexAndMetadata.getIndexCommitTime());
                } catch (JoyQueueException e) {
                    code = e.getCode();
                }
                hashedMap2.put(Integer.valueOf(intValue), Short.valueOf((short) code));
            }
            hashedMap.put(str, hashedMap2);
        }
        return new Command(new JoyQueueHeader(Direction.RESPONSE, QosLevel.ONE_WAY, -48), new ConsumeIndexStoreResponse(hashedMap));
    }

    private void setConsumeIndex(String str, short s, String str2, long j, long j2) throws JoyQueueException {
        Consumer consumer = new Consumer(str, str2);
        if (getLastCommitTime(str, s, str2) > j2) {
            return;
        }
        this.consume.setAckIndex(consumer, s, j);
        this.consume.setStartAckIndex(consumer, s, -1L);
        putLastCommitIndex(str, s, str2, j2);
    }

    protected long getLastCommitTime(String str, short s, String str2) {
        try {
            return ((Long) this.commitIndexCache.get(generateIndexKey(str, s, str2), new Callable<Long>() { // from class: org.joyqueue.broker.index.handler.ConsumeIndexStoreHandler.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Long call() throws Exception {
                    return 0L;
                }
            })).longValue();
        } catch (ExecutionException e) {
            return 0L;
        }
    }

    protected void putLastCommitIndex(String str, short s, String str2, long j) {
        this.commitIndexCache.put(generateIndexKey(str, s, str2), Long.valueOf(j));
    }

    protected String generateIndexKey(String str, short s, String str2) {
        return String.format("%s_%s_%s", str, Short.valueOf(s), str2);
    }

    public int type() {
        return 48;
    }
}
