package org.joyqueue.broker.consumer.position;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.consumer.model.ConsumePartition;
import org.joyqueue.broker.consumer.position.model.ConsumeBill;
import org.joyqueue.broker.consumer.position.model.Position;
import org.joyqueue.network.session.Joint;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.io.Files;
import org.joyqueue.toolkit.lang.Close;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/consumer/position/LocalFileStore.class */
public class LocalFileStore implements PositionStore<ConsumePartition, Position> {
    PositionConfig config;
    private File indexFile;
    private File indexFileBack;
    private String basePath;
    private LoopThread thread;
    private Logger logger = LoggerFactory.getLogger(LocalFileStore.class);
    private final Object[] fileUpdateLock = new Object[0];
    private ConcurrentMap<ConsumePartition, Position> consumePositionCache = new ConcurrentHashMap();
    private AtomicBoolean isStarted = new AtomicBoolean(false);

    public void setBasePath(String str) {
        this.basePath = str;
    }

    public void start() throws Exception {
        if (this.isStarted.get()) {
            this.logger.info("LocalFileStore is started, can not be restart.");
            return;
        }
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.basePath), "basePath can not be null!");
        this.config = new PositionConfig(this.basePath);
        this.indexFile = this.config.getPositionFile();
        this.indexFileBack = new File(this.indexFile.getParentFile(), this.indexFile.getName() + PositionConfig.BACK_SUFFIX);
        Files.createFile(this.indexFile);
        Files.createFile(this.indexFileBack);
        initConsumePositionCache();
        this.thread = LoopThread.builder().sleepTime(30000L, 30000L).name("Consume-Position-Store-Thread").onException(th -> {
            this.logger.error(th.getMessage(), th);
        }).doWork(this::doFlush).build();
        this.thread.start();
        this.isStarted.set(true);
        this.logger.info("LocalFileStore is started.");
    }

    private void initConsumePositionCache() throws Exception {
        this.consumePositionCache = recover();
    }

    public void stop() {
        if (this.thread != null) {
            this.thread.stop();
        }
        this.isStarted.set(false);
        this.logger.info("LocalFileStore is stop.");
    }

    public boolean isStarted() {
        return this.isStarted.get();
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public Position get(ConsumePartition consumePartition) {
        return this.consumePositionCache.get(consumePartition);
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public void put(ConsumePartition consumePartition, Position position) {
        this.consumePositionCache.put(consumePartition, position);
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public Position remove(ConsumePartition consumePartition) {
        return this.consumePositionCache.remove(consumePartition);
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public Position putIfAbsent(ConsumePartition consumePartition, Position position) {
        return this.consumePositionCache.putIfAbsent(consumePartition, position);
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public void forceFlush() {
        doFlush();
    }

    @Override // org.joyqueue.broker.consumer.position.PositionStore
    public Iterator<ConsumePartition> iterator() {
        return this.consumePositionCache.keySet().iterator();
    }

    protected void doFlush() {
        dump(cloneIndexCache(this.consumePositionCache));
    }

    private Map<Joint, List<ConsumeBill>> cloneIndexCache(ConcurrentMap<ConsumePartition, Position> concurrentMap) {
        HashMap hashMap = new HashMap(concurrentMap.size());
        for (Map.Entry<ConsumePartition, Position> entry : concurrentMap.entrySet()) {
            ConsumePartition key = entry.getKey();
            String topic = key.getTopic();
            String app = key.getApp();
            short partition = key.getPartition();
            int partitionGroup = key.getPartitionGroup();
            Position value = entry.getValue();
            Joint joint = new Joint(topic, app);
            List list = (List) hashMap.get(joint);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(joint, list);
            }
            list.add(new ConsumeBill(partitionGroup, partition, value));
        }
        return hashMap;
    }

    private void dump(Map<Joint, List<ConsumeBill>> map) {
        try {
            String jSONString = JSON.toJSONString(map);
            synchronized (this.fileUpdateLock) {
                writeFile(this.indexFile, jSONString);
                writeFile(this.indexFileBack, jSONString);
            }
        } catch (Exception e) {
            this.logger.error("flush index error.", e);
        }
    }

    public ConcurrentMap<ConsumePartition, Position> recover() throws IOException {
        Map map;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            map = (Map) loadFromFile(this.indexFile, new TypeReference<Map<Joint, List<ConsumeBill>>>() { // from class: org.joyqueue.broker.consumer.position.LocalFileStore.1
            });
        } catch (Exception e) {
            map = (Map) loadFromFile(this.indexFileBack, new TypeReference<Map<Joint, List<ConsumeBill>>>() { // from class: org.joyqueue.broker.consumer.position.LocalFileStore.2
            });
        }
        if (map != null) {
            map.entrySet().stream().forEach(entry -> {
                Joint joint = (Joint) entry.getKey();
                ((List) entry.getValue()).stream().forEach(consumeBill -> {
                    ConsumePartition consumePartition = new ConsumePartition(joint.getTopic(), joint.getApp(), consumeBill.getPartition());
                    consumePartition.setPartitionGroup(consumeBill.getPartitionGroup());
                    concurrentHashMap.putIfAbsent(consumePartition, new Position(consumeBill.getAckStartIndex(), consumeBill.getAckCurIndex(), consumeBill.getPullStartIndex(), consumeBill.getPullCurIndex()));
                });
            });
        }
        return concurrentHashMap;
    }

    private void writeFile(File file, String str) throws IOException {
        FileWriter fileWriter = new FileWriter(file);
        try {
            fileWriter.write(str);
            fileWriter.flush();
            Close.close(fileWriter);
        } catch (Throwable th) {
            Close.close(fileWriter);
            throw th;
        }
    }

    private <T> T loadFromFile(File file, TypeReference<T> typeReference) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
        try {
            StringBuilder sb = new StringBuilder();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                sb.append(readLine).append('\n');
            }
            if (sb.length() <= 0) {
                return null;
            }
            T t = (T) JSON.parseObject(sb.toString(), typeReference, new Feature[0]);
            Close.close(bufferedReader);
            return t;
        } finally {
            Close.close(bufferedReader);
        }
    }
}
