package org.joyqueue.broker.archive;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.joyqueue.broker.Plugins;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.MessageLocation;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.network.session.Connection;
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.security.Md5;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/archive/ConsumeArchiveService.class */
public class ConsumeArchiveService extends Service {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeArchiveService.class);
    private ArchiveMappedFileRepository repository;
    private ArchiveStore archiveStore;
    private ClusterManager clusterManager;
    private ArchiveConfig archiveConfig;
    private AtomicInteger readByteCounter;
    private LoopThread readConsumeLogThread;
    private LoopThread cleanConsumeLogFileThread;
    private PointTracer tracer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/archive/ConsumeArchiveService$ArchiveMappedFileRepository.class */
    public static class ArchiveMappedFileRepository implements Closeable {
        private String baseDir;
        private File rwFile;
        private MappedByteBuffer rwMap;
        private RandomAccessFile rwRaf;
        private FileChannel rwFileChannel;
        private File rFile;
        private MappedByteBuffer rMap;
        private RandomAccessFile rRaf;
        private FileChannel rFileChannel;
        private volatile long position = 0;
        private final long pageSize = 16777216;

        ArchiveMappedFileRepository(String str) {
            this.baseDir = str;
            recover();
        }

        private void recover() {
            File LastFile = LastFile();
            if (LastFile == null) {
                return;
            }
            this.rwFile = LastFile;
            try {
                this.rwRaf = new RandomAccessFile(this.rwFile, "rw");
                this.rwFileChannel = this.rwRaf.getChannel();
                this.rwMap = this.rwFileChannel.map(FileChannel.MapMode.READ_WRITE, 0L, this.rwFile.length());
            } catch (IOException e) {
                ConsumeArchiveService.logger.error(e.getMessage(), e);
            }
            while (checkStartFlag(this.rwMap)) {
                this.rwMap.get(new byte[this.rwMap.getInt()]);
            }
            if (checkFileEndFlag(this.rwMap)) {
                close();
            } else {
                this.position = this.rwMap.position();
            }
        }

        public synchronized void append(ByteBuffer byteBuffer) {
            this.position += byteBuffer.limit();
            if (this.rwMap == null) {
                newMappedRWFile();
                this.position = 0L;
                append(byteBuffer);
            } else if (1 + this.position >= 16777216) {
                this.rwMap.put(Byte.MAX_VALUE);
                this.rwMap = null;
                append(byteBuffer);
            } else {
                if (byteBuffer.limit() == 0) {
                    ConsumeArchiveService.logger.debug("append buffer limit is zero.");
                    return;
                }
                this.rwMap.put(Byte.MIN_VALUE);
                this.position++;
                this.rwMap.put(byteBuffer);
            }
        }

        public void newMappedRWFile() {
            try {
                if (this.rwFileChannel != null) {
                    this.rwFileChannel.close();
                }
                this.rwFileChannel = null;
                if (this.rwRaf != null) {
                    this.rwRaf.close();
                }
                this.rwRaf = null;
                this.rwFile = null;
                File file = new File(this.baseDir);
                if (!file.exists()) {
                    file.mkdirs();
                }
                this.rwFile = FileUtils.getFile(new String[]{this.baseDir, SystemClock.now() + IndexAndMetadata.NO_METADATA});
                this.rwRaf = new RandomAccessFile(this.rwFile, "rw");
                this.rwFileChannel = this.rwRaf.getChannel();
                this.rwMap = this.rwFileChannel.map(FileChannel.MapMode.READ_WRITE, 0L, 16777216L);
            } catch (Exception e) {
                ConsumeArchiveService.logger.error("create and mapped file error.", e);
            }
        }

        private void mappedReadOnlyFile() {
            try {
                closeCurrentReadFile();
                this.rRaf = new RandomAccessFile(this.rFile, "r");
                this.rFileChannel = this.rRaf.getChannel();
                this.rMap = this.rFileChannel.map(FileChannel.MapMode.READ_ONLY, 0L, 16777216L);
            } catch (IOException e) {
                ConsumeArchiveService.logger.error(e.getMessage(), e);
            }
        }

        public byte[] readOne() {
            if (this.rMap == null) {
                if (nextFile() == null) {
                    return new byte[0];
                }
                mappedReadOnlyFile();
            }
            if (checkStartFlag(this.rMap)) {
                byte[] bArr = new byte[this.rMap.getInt()];
                this.rMap.get(bArr);
                return bArr;
            }
            if (checkFileEndFlag(this.rMap)) {
                ConsumeArchiveService.logger.debug("Finish reading the file {}.{}", this.rFile, this.rMap.toString());
                if (nextFile() != null) {
                    mappedReadOnlyFile();
                    return readOne();
                }
            }
            return new byte[0];
        }

        private boolean checkStartFlag(MappedByteBuffer mappedByteBuffer) {
            if (mappedByteBuffer.position() + 1 >= 16777216) {
                return false;
            }
            if (mappedByteBuffer.get() == Byte.MIN_VALUE) {
                return true;
            }
            mappedByteBuffer.position(mappedByteBuffer.position() - 1);
            return false;
        }

        private boolean checkFileEndFlag(MappedByteBuffer mappedByteBuffer) {
            if (mappedByteBuffer.position() + 1 > 16777216) {
                return false;
            }
            if (mappedByteBuffer.get() == Byte.MAX_VALUE) {
                mappedByteBuffer.position(mappedByteBuffer.position() - 1);
                return true;
            }
            mappedByteBuffer.position(mappedByteBuffer.position() - 1);
            return false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void rollBack(int i) {
            if (this.rMap != null) {
                this.rMap.position(this.rMap.position() - i);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                closeCurrentReadFile();
                closeCurrentWriteFile();
            } catch (IOException e) {
                ConsumeArchiveService.logger.error("delete read file error.", e);
            }
        }

        public void closeCurrentReadFile() throws IOException {
            if (this.rFileChannel != null) {
                this.rFileChannel.close();
            }
            if (this.rRaf != null) {
                this.rRaf.close();
            }
        }

        public void closeCurrentWriteFile() throws IOException {
            if (this.rwFileChannel != null) {
                this.rwFileChannel.close();
            }
            if (this.rwRaf != null) {
                this.rwRaf.close();
            }
        }

        private File nextFile() {
            String[] list = new File(this.baseDir).list();
            if (list == null || list.length == 1) {
                ConsumeArchiveService.logger.debug("only one write file.");
                return null;
            }
            ConsumeArchiveService.logger.debug("archive file list {}", Arrays.toString(list));
            String name = this.rFile == null ? IndexAndMetadata.NO_METADATA : this.rFile.getName();
            List list2 = (List) Arrays.asList(list).stream().filter(str -> {
                return str.compareTo(name) > 0;
            }).sorted(Comparator.naturalOrder()).collect(Collectors.toList());
            if (list2.size() <= 1) {
                ConsumeArchiveService.logger.debug("only one write file.");
                return null;
            }
            File file = new File(this.baseDir + ((String) list2.get(0)));
            this.rFile = file;
            ConsumeArchiveService.logger.debug("current read consume event file {}", file);
            return this.rFile;
        }

        private File LastFile() {
            String[] list = new File(this.baseDir).list();
            if (list == null) {
                return null;
            }
            Optional findFirst = Arrays.asList(list).stream().sorted(Comparator.reverseOrder()).findFirst();
            if (!findFirst.isPresent()) {
                return null;
            }
            this.rwFile = new File(this.baseDir + ((String) findFirst.get()));
            return this.rwFile;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void delArchivedFile() {
            List<String> archivedFileList = getArchivedFileList();
            if (CollectionUtils.isNotEmpty(archivedFileList)) {
                archivedFileList.stream().forEach(str -> {
                    new File(this.baseDir + str).delete();
                });
            }
        }

        private List<String> getArchivedFileList() {
            if (this.rFile == null) {
                ConsumeArchiveService.logger.debug("Can not get archive file list cause by consume archive read file have no init.");
                return null;
            }
            String[] list = new File(this.baseDir).list();
            if (list == null) {
                return null;
            }
            return (List) Arrays.asList(list).stream().filter(str -> {
                return str.compareTo(this.rFile.getName()) < 0;
            }).collect(Collectors.toList());
        }

        public int getLocalFileNum() {
            String[] list = new File(this.baseDir).list();
            if (list == null) {
                return 0;
            }
            return list.length;
        }

        public long getReadPosition() {
            return this.rMap.position();
        }

        public long getWritePosition() {
            return this.rwMap.position();
        }

        public long getPageSize() {
            return 16777216L;
        }

        public synchronized void tryFinishCurWriteFile() {
            if (this.rwFile == null) {
                return;
            }
            String name = this.rwFile.getName();
            long now = SystemClock.now();
            if (this.position <= 0 || now - Long.parseLong(name) < 60000) {
                return;
            }
            this.position = 16777216L;
            append(ByteBuffer.wrap(new byte[0]));
            ConsumeArchiveService.logger.info("reset write file {} position {} to pageSize.", this.rwFile.getName(), this.rwMap.toString());
        }
    }

    public ConsumeArchiveService(ArchiveConfig archiveConfig, ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.archiveConfig = archiveConfig;
    }

    protected void validate() throws Exception {
        super.validate();
        if (this.archiveStore == null) {
            this.archiveStore = (ArchiveStore) Plugins.ARCHIVESTORE.get();
        }
        this.archiveStore.setNameSpace(this.archiveConfig.getNamespace());
        logger.info("Get archive store namespace [{}] by archive config.", this.archiveConfig.getNamespace());
        Preconditions.checkArgument(this.archiveStore != null, "archive store can not be null.");
        this.repository = new ArchiveMappedFileRepository(this.archiveConfig.getArchivePath());
        this.readByteCounter = new AtomicInteger(0);
        this.tracer = (PointTracer) Plugins.TRACERERVICE.get(this.archiveConfig.getTracerType());
        this.readConsumeLogThread = LoopThread.builder().sleepTime(1L, 10L).name("ReadAndPutHBase-ConsumeLog-Thread").onException(th -> {
            logger.error(th.getMessage(), th);
            this.repository.rollBack(this.readByteCounter.get());
            logger.info("finish rollback.");
        }).doWork(this::readAndWrite).build();
        this.cleanConsumeLogFileThread = LoopThread.builder().sleepTime(10000L, 10000L).name("CleanArchiveFile-ConsumeLog-Thread").onException(th2 -> {
            logger.error(th2.getMessage(), th2);
        }).doWork(this::cleanAndRollWriteFile).build();
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.archiveStore.start();
        this.readConsumeLogThread.start();
        this.cleanConsumeLogFileThread.start();
    }

    protected void doStop() {
        super.doStop();
        Close.close(this.readConsumeLogThread);
        Close.close(this.cleanConsumeLogFileThread);
        Close.close(this.repository);
        Close.close(this.archiveStore);
    }

    private void readAndWrite() throws JoyQueueException, InterruptedException {
        int size;
        int consumeBatchNum = this.archiveConfig.getConsumeBatchNum();
        do {
            List<ConsumeLog> readConsumeLog = readConsumeLog(consumeBatchNum);
            size = readConsumeLog.size();
            if (size <= 0) {
                if (this.repository.rFile == null || this.repository.rMap == null) {
                    logger.debug("read file is null.");
                    return;
                } else {
                    logger.debug("read file name {}, read position {}", this.repository.rFile.getName(), this.repository.rMap.toString());
                    return;
                }
            }
            long now = SystemClock.now();
            this.archiveStore.putConsumeLog(readConsumeLog, this.tracer);
            logger.debug("Write consumeLogs size:{} to archive store, and elapsed time {}ms", Integer.valueOf(readConsumeLog.size()), Long.valueOf(SystemClock.now() - now));
            int consumeWriteDelay = this.archiveConfig.getConsumeWriteDelay();
            if (consumeWriteDelay > 0) {
                Thread.sleep(consumeWriteDelay);
            }
        } while (size == consumeBatchNum);
    }

    private void cleanAndRollWriteFile() {
        this.repository.delArchivedFile();
        this.repository.tryFinishCurWriteFile();
    }

    private List<ConsumeLog> readConsumeLog(int i) {
        this.readByteCounter.set(0);
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < i; i2++) {
            byte[] readOne = this.repository.readOne();
            if (readOne.length <= 0) {
                break;
            }
            this.readByteCounter.addAndGet(5 + readOne.length);
            linkedList.add(ArchiveSerializer.read(ByteBuffer.wrap(readOne)));
        }
        return linkedList;
    }

    public long getRemainConsumeLogFileNum() {
        return this.repository.getLocalFileNum() * this.repository.getPageSize();
    }

    public void appendConsumeLog(Connection connection, MessageLocation[] messageLocationArr) throws JoyQueueException {
        if (isStarted()) {
            convert(connection, messageLocationArr).stream().forEach(consumeLog -> {
                ByteBuffer write = ArchiveSerializer.write(consumeLog);
                appendLog(write);
                ArchiveSerializer.release(write);
            });
        } else {
            logger.debug("ConsumeArchiveService not be started.");
        }
    }

    private List<ConsumeLog> convert(Connection connection, MessageLocation[] messageLocationArr) throws JoyQueueException {
        LinkedList linkedList = new LinkedList();
        for (MessageLocation messageLocation : messageLocationArr) {
            ConsumeLog consumeLog = new ConsumeLog();
            consumeLog.setBytesMessageId(buildMessageId(messageLocation));
            consumeLog.setApp(connection.getApp());
            consumeLog.setBrokerId(this.clusterManager.getBrokerId().intValue());
            consumeLog.setClientIp(connection.getAddress());
            consumeLog.setConsumeTime(SystemClock.now());
            linkedList.add(consumeLog);
        }
        return linkedList;
    }

    private byte[] buildMessageId(MessageLocation messageLocation) {
        byte[] bArr = new byte[0];
        try {
            bArr = Md5.INSTANCE.encrypt((messageLocation.getTopic() + ((int) messageLocation.getPartition()) + messageLocation.getIndex()).getBytes(), (byte[]) null);
        } catch (GeneralSecurityException e) {
            logger.error("topic:{}, partition:{}, index:{}, exception:{}", new Object[]{messageLocation.getTopic(), Short.valueOf(messageLocation.getPartition()), Long.valueOf(messageLocation.getIndex()), e});
        }
        return bArr;
    }

    private synchronized void appendLog(ByteBuffer byteBuffer) {
        this.repository.append(byteBuffer);
    }
}
