package org.joyqueue.broker.archive;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.Plugins;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.MessageConvertSupport;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.broker.monitor.DefaultPointTracer;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.store.PositionOverflowException;
import org.joyqueue.store.PositionUnderflowException;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/archive/ProduceArchiveService.class */
public class ProduceArchiveService extends Service {
    private static final Logger logger = LoggerFactory.getLogger(ProduceArchiveService.class);
    private ClusterManager clusterManager;
    private Consume consume;
    private ArchiveStore archiveStore;
    private BlockingQueue<SendLog> archiveQueue;
    private ExecutorService executorService;
    private PointTracer tracer;
    private LoopThread updateItemThread;
    private LoopThread readMsgThread;
    private LoopThread writeMsgThread;
    private ArchiveConfig archiveConfig;
    private MessageConvertSupport messageConvertSupport;
    private int batchNum = 1000;
    private ItemList itemList = new ItemList();
    private ConcurrentMap<String, AtomicInteger> storeCounter = new ConcurrentHashMap();
    private String separator = ":";
    AtomicBoolean hasStoreError = new AtomicBoolean(false);
    private final Map<String, Long> pauseMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/archive/ProduceArchiveService$ItemList.class */
    public class ItemList {
        private CopyOnWriteArrayList<SendArchiveItem> cpList = new CopyOnWriteArrayList<>();

        ItemList() {
        }

        public List<SendArchiveItem> getAll() {
            return this.cpList;
        }

        public void remove(SendArchiveItem sendArchiveItem) {
            this.cpList.remove(sendArchiveItem);
        }

        public void addAndUpdate(List<SendArchiveItem> list) throws JoyQueueException {
            this.cpList.stream().forEach(sendArchiveItem -> {
                if (list.contains(sendArchiveItem)) {
                    return;
                }
                remove(sendArchiveItem);
            });
            for (SendArchiveItem sendArchiveItem2 : list) {
                if (!this.cpList.contains(sendArchiveItem2)) {
                    Long position = ProduceArchiveService.this.archiveStore.getPosition(sendArchiveItem2.topic, sendArchiveItem2.partition.shortValue());
                    if (position == null) {
                        position = 0L;
                    }
                    sendArchiveItem2.setReadIndex(position.longValue());
                    this.cpList.add(sendArchiveItem2);
                }
            }
        }

        public void updateReadIndex(String str, short s, long j) {
            Optional findAny = this.cpList.stream().filter(sendArchiveItem -> {
                return str.equals(sendArchiveItem.getTopic()) && sendArchiveItem.getPartition().equals(Short.valueOf(s));
            }).findAny();
            if (findAny.isPresent()) {
                SendArchiveItem sendArchiveItem2 = (SendArchiveItem) findAny.get();
                if (sendArchiveItem2.getReadIndex() > j) {
                    sendArchiveItem2.setReadIndex(j);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/archive/ProduceArchiveService$SendArchiveItem.class */
    public static class SendArchiveItem {
        private final String topic;
        private final Short partition;
        private AtomicLong readIndex = new AtomicLong(0);

        SendArchiveItem(String str, Short sh) {
            this.topic = str;
            this.partition = sh;
        }

        public long getReadIndex() {
            if (this.readIndex == null) {
                this.readIndex = new AtomicLong(0L);
            }
            return this.readIndex.get();
        }

        public synchronized void setReadIndex(long j) {
            this.readIndex.set(j);
        }

        public void setReadIndex(long j, long j2) {
            if (this.readIndex.get() < j2) {
                return;
            }
            synchronized (this) {
                if (this.readIndex.get() < j2) {
                    return;
                }
                setReadIndex(j);
            }
        }

        public String getTopic() {
            return this.topic;
        }

        public Short getPartition() {
            return this.partition;
        }

        public int hashCode() {
            return super.hashCode();
        }

        public boolean equals(Object obj) {
            SendArchiveItem sendArchiveItem = (SendArchiveItem) obj;
            return StringUtils.equals(this.topic, sendArchiveItem.topic) && this.partition == sendArchiveItem.partition;
        }

        public String toString() {
            return "SendArchiveItem{topic='" + this.topic + "', partition=" + this.partition + ", readIndex=" + this.readIndex + '}';
        }
    }

    public ProduceArchiveService(ArchiveConfig archiveConfig, ClusterManager clusterManager, Consume consume, MessageConvertSupport messageConvertSupport) {
        this.clusterManager = clusterManager;
        this.consume = consume;
        this.archiveConfig = archiveConfig;
        this.messageConvertSupport = messageConvertSupport;
    }

    protected void validate() throws Exception {
        super.validate();
        this.archiveStore = this.archiveStore != null ? this.archiveStore : (ArchiveStore) Plugins.ARCHIVESTORE.get();
        this.archiveStore.setNameSpace(this.archiveConfig.getNamespace());
        logger.info("Get archive store namespace [{}] by archive config.", this.archiveConfig.getNamespace());
        Preconditions.checkArgument(this.archiveStore != null, "archive store can not be null.");
        Preconditions.checkArgument(this.archiveConfig != null, "archive config can not be null.");
        this.tracer = (PointTracer) Plugins.TRACERERVICE.get(this.archiveConfig.getTracerType());
        if (this.tracer == null) {
            this.tracer = new DefaultPointTracer();
        }
        this.batchNum = this.archiveConfig.getProduceBatchNum();
        this.archiveQueue = new LinkedBlockingDeque(this.archiveConfig.getLogQueueSize());
        this.executorService = new ThreadPoolExecutor(this.archiveConfig.getWriteThreadNum(), this.archiveConfig.getWriteThreadNum(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.archiveConfig.getThreadPoolQueueSize()), new NamedThreadFactory("sendLog-archive"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.updateItemThread = LoopThread.builder().sleepTime(10000L, 10000L).name("UpdateArchiveItem-Thread").onException(th -> {
            logger.warn("Exception:", th);
        }).doWork(() -> {
            updateArchiveItem();
            syncArchivePosition();
        }).build();
        this.readMsgThread = LoopThread.builder().sleepTime(0L, 10L).name("ReadArchiveMsg-Thread").onException(th2 -> {
            logger.warn("Exception:", th2);
        }).doWork(() -> {
            readArchiveMsg();
        }).build();
        this.writeMsgThread = LoopThread.builder().sleepTime(10L, 10L).name("WriteArchiveMsg-Thread").onException(th3 -> {
            logger.warn("Exception:", th3);
        }).doWork(() -> {
            write2Store();
        }).build();
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.archiveStore.start();
        updateArchiveItem();
        this.updateItemThread.start();
        this.readMsgThread.start();
        this.writeMsgThread.start();
        logger.info("produce archive archiveService started.");
    }

    protected void doStop() {
        super.doStop();
        Close.close(this.updateItemThread);
        Close.close(this.readMsgThread);
        Close.close(this.writeMsgThread);
        Close.close(this.executorService);
        Close.close(this.archiveStore);
        logger.info("produce archive archiveService stopped.");
    }

    private void updateArchiveItem() throws JoyQueueException {
        ArrayList arrayList = new ArrayList();
        List<TopicConfig> topics = this.clusterManager.getTopics();
        int intValue = this.clusterManager.getBroker().getId().intValue();
        topics.stream().forEach(topicConfig -> {
            TopicName name = topicConfig.getName();
            if (this.clusterManager.checkArchiveable(name)) {
                logger.info("Topic:{} send archive is enable.", name.getFullName());
                topicConfig.fetchPartitionByBroker(intValue).stream().forEach(sh -> {
                    arrayList.add(new SendArchiveItem(name.getFullName(), sh));
                });
            }
        });
        this.itemList.addAndUpdate(arrayList);
    }

    private void readArchiveMsg() throws Exception {
        int i = 0;
        for (SendArchiveItem sendArchiveItem : this.itemList.getAll()) {
            if (!isPause(sendArchiveItem.getTopic(), sendArchiveItem.getPartition())) {
                long readIndex = sendArchiveItem.getReadIndex();
                try {
                    PullResult message = this.consume.getMessage(sendArchiveItem.topic, sendArchiveItem.partition.shortValue(), readIndex, this.batchNum);
                    int size = message.getBuffers().size();
                    if (size == 0) {
                        put2PauseMap(sendArchiveItem.getTopic(), sendArchiveItem.getPartition());
                    } else {
                        int putSendLog2Queue = putSendLog2Queue(message);
                        if (putSendLog2Queue > 0) {
                            this.writeMsgThread.wakeup();
                        }
                        sendArchiveItem.setReadIndex(readIndex + putSendLog2Queue, readIndex);
                        i += size;
                        if (logger.isDebugEnabled()) {
                            logger.info("produce archive: {} messages put into the archive queue.", Integer.valueOf(putSendLog2Queue));
                        }
                    }
                } catch (Throwable th) {
                    logger.error("read message from topic:" + sendArchiveItem.topic + " partition:" + sendArchiveItem.partition + " index:" + sendArchiveItem.getReadIndex() + " error.", th);
                    if (th.getCause() instanceof PositionUnderflowException) {
                        long minIndex = this.consume.getMinIndex(new Consumer(sendArchiveItem.topic, IndexAndMetadata.NO_METADATA), sendArchiveItem.partition.shortValue());
                        sendArchiveItem.setReadIndex(minIndex);
                        logger.info("repair read message position SendArchiveItem info:[{}], currentIndex:[{}]", sendArchiveItem, Long.valueOf(minIndex));
                    }
                    if (th.getCause() instanceof PositionOverflowException) {
                        long maxIndex = this.consume.getMaxIndex(new Consumer(sendArchiveItem.topic, IndexAndMetadata.NO_METADATA), sendArchiveItem.partition.shortValue());
                        sendArchiveItem.setReadIndex(maxIndex);
                        logger.warn("repair read message position SendArchiveItem info:[{}], currentIndex:[{}]", sendArchiveItem, Long.valueOf(maxIndex));
                    }
                    put2PauseMap(sendArchiveItem.getTopic(), sendArchiveItem.getPartition());
                }
            }
        }
        if (i == 0) {
            Thread.sleep(1L);
        }
    }

    private void put2PauseMap(String str, Short sh) {
        this.pauseMap.put(str + this.separator + sh, Long.valueOf(SystemClock.now() + 2000));
    }

    private boolean isPause(String str, Short sh) {
        Long l = this.pauseMap.get(str + this.separator + sh);
        return l != null && l.longValue() > SystemClock.now();
    }

    private int putSendLog2Queue(PullResult pullResult) throws Exception {
        int i = 0;
        for (ByteBuffer byteBuffer : pullResult.getBuffers()) {
            List<BrokerMessage> parseMessage = parseMessage(byteBuffer);
            for (BrokerMessage brokerMessage : parseMessage) {
                brokerMessage.setTopic(pullResult.getTopic());
                brokerMessage.setPartition(pullResult.getPartition());
                this.archiveQueue.put(convert(brokerMessage, byteBuffer));
            }
            i += parseMessage.size();
        }
        return i;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private List<BrokerMessage> parseMessage(ByteBuffer byteBuffer) throws Exception {
        BrokerMessage readBrokerMessage = Serializer.readBrokerMessage(byteBuffer);
        List linkedList = new LinkedList();
        if (readBrokerMessage.getSource() == SourceType.KAFKA.getValue() && readBrokerMessage.isBatch()) {
            linkedList = this.messageConvertSupport.convertBatch(readBrokerMessage, SourceType.INTERNAL.getValue());
        } else {
            linkedList.add(readBrokerMessage);
        }
        return linkedList;
    }

    private SendLog convert(BrokerMessage brokerMessage, ByteBuffer byteBuffer) {
        SendLog sendLog = new SendLog();
        sendLog.setTopic(brokerMessage.getTopic());
        sendLog.setSendTime(brokerMessage.getStartTime());
        sendLog.setBusinessId(brokerMessage.getBusinessId() == null ? IndexAndMetadata.NO_METADATA : brokerMessage.getBusinessId());
        sendLog.setMessageId(brokerMessage.getTopic() + ((int) brokerMessage.getPartition()) + brokerMessage.getMsgIndexNo());
        sendLog.setBrokerId(this.clusterManager.getBrokerId().intValue());
        sendLog.setApp(brokerMessage.getApp());
        sendLog.setClientIp(brokerMessage.getClientIp());
        sendLog.setCompressType((short) -1);
        sendLog.setMessageBody(byteBuffer.array());
        sendLog.setPartition(brokerMessage.getPartition());
        sendLog.setIndex(brokerMessage.getMsgIndexNo());
        return sendLog;
    }

    private void write2Store() throws InterruptedException {
        int size;
        SendLog poll;
        do {
            ArrayList arrayList = new ArrayList(this.batchNum);
            for (int i = 0; i < this.batchNum && (poll = this.archiveQueue.poll()) != null; i++) {
                arrayList.add(poll);
            }
            size = arrayList.size();
            if (size > 0) {
                this.executorService.submit(() -> {
                    try {
                        this.archiveStore.putSendLog(arrayList, this.tracer);
                        logger.debug("Write sendLogs size:{} to archive store.", Integer.valueOf(arrayList.size()));
                        writeCounter(arrayList);
                    } catch (JoyQueueException e) {
                        this.hasStoreError.set(true);
                        rollBackReadIndex(arrayList);
                    }
                });
            }
            if (this.hasStoreError.getAndSet(false)) {
                Thread.sleep(1000L);
            }
        } while (size == this.batchNum);
    }

    private void writeCounter(List<SendLog> list) {
        list.stream().forEach(sendLog -> {
            AtomicInteger atomicInteger = this.storeCounter.get(sendLog.getTopic() + this.separator + ((int) sendLog.getPartition()));
            if (atomicInteger == null && null == this.storeCounter.putIfAbsent(sendLog.getTopic() + this.separator + ((int) sendLog.getPartition()), new AtomicInteger())) {
                atomicInteger = this.storeCounter.get(sendLog.getTopic() + this.separator + ((int) sendLog.getPartition()));
            }
            atomicInteger.incrementAndGet();
        });
    }

    private void rollBackReadIndex(List<SendLog> list) {
        SendLog sendLog = list.get(0);
        String topic = sendLog.getTopic();
        short partition = sendLog.getPartition();
        long index = sendLog.getIndex();
        for (SendLog sendLog2 : list) {
            if (topic.equals(sendLog2.getTopic()) && partition == sendLog2.getPartition()) {
                index = Math.min(index, sendLog2.getIndex());
            } else {
                this.itemList.updateReadIndex(topic, partition, index);
                topic = sendLog2.getTopic();
                partition = sendLog2.getPartition();
                index = sendLog2.getIndex();
            }
        }
        this.itemList.updateReadIndex(topic, partition, index);
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x009f, code lost:
    
        r1 = r0.get();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void syncArchivePosition() {
        /*
            r10 = this;
            r0 = r10
            java.util.concurrent.ConcurrentMap<java.lang.String, java.util.concurrent.atomic.AtomicInteger> r0 = r0.storeCounter
            java.util.Set r0 = r0.keySet()
            java.util.Iterator r0 = r0.iterator()
            r11 = r0
        Lf:
            r0 = r11
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto Lba
            r0 = r11
            java.lang.Object r0 = r0.next()
            java.lang.String r0 = (java.lang.String) r0
            r12 = r0
            r0 = r12
            r1 = r10
            java.lang.String r1 = r1.separator
            java.lang.String[] r0 = r0.split(r1)
            r13 = r0
            r0 = r13
            r1 = 0
            r0 = r0[r1]
            r14 = r0
            r0 = r13
            r1 = 1
            r0 = r0[r1]
            short r0 = java.lang.Short.parseShort(r0)
            r15 = r0
            r0 = r10
            java.util.concurrent.ConcurrentMap<java.lang.String, java.util.concurrent.atomic.AtomicInteger> r0 = r0.storeCounter
            r1 = r12
            java.lang.Object r0 = r0.get(r1)
            java.util.concurrent.atomic.AtomicInteger r0 = (java.util.concurrent.atomic.AtomicInteger) r0
            r16 = r0
        L47:
            r0 = r16
            r1 = r16
            int r1 = r1.get()
            r2 = r1
            r17 = r2
            r2 = 0
            boolean r0 = r0.compareAndSet(r1, r2)
            if (r0 != 0) goto L5b
            goto L47
        L5b:
            r0 = r10
            org.joyqueue.server.archive.store.api.ArchiveStore r0 = r0.archiveStore     // Catch: java.lang.Throwable -> L9d
            r1 = r14
            r2 = r15
            java.lang.Long r0 = r0.getPosition(r1, r2)     // Catch: java.lang.Throwable -> L9d
            r18 = r0
            r0 = r18
            if (r0 != 0) goto L73
            r0 = 0
            goto L78
        L73:
            r0 = r18
            long r0 = r0.longValue()     // Catch: java.lang.Throwable -> L9d
        L78:
            java.lang.Long r0 = java.lang.Long.valueOf(r0)     // Catch: java.lang.Throwable -> L9d
            r18 = r0
            r0 = r10
            org.joyqueue.server.archive.store.api.ArchiveStore r0 = r0.archiveStore     // Catch: java.lang.Throwable -> L9d
            org.joyqueue.server.archive.store.model.AchivePosition r1 = new org.joyqueue.server.archive.store.model.AchivePosition     // Catch: java.lang.Throwable -> L9d
            r2 = r1
            r3 = r14
            r4 = r15
            r5 = r18
            long r5 = r5.longValue()     // Catch: java.lang.Throwable -> L9d
            r6 = r17
            long r6 = (long) r6     // Catch: java.lang.Throwable -> L9d
            long r5 = r5 + r6
            r2.<init>(r3, r4, r5)     // Catch: java.lang.Throwable -> L9d
            r0.putPosition(r1)     // Catch: java.lang.Throwable -> L9d
            goto Lb7
        L9d:
            r18 = move-exception
        L9f:
            r0 = r16
            r1 = r16
            int r1 = r1.get()
            r2 = r1
            r19 = r2
            r2 = r19
            r3 = r17
            int r2 = r2 + r3
            boolean r0 = r0.compareAndSet(r1, r2)
            if (r0 != 0) goto Lb7
            goto L9f
        Lb7:
            goto Lf
        Lba:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.joyqueue.broker.archive.ProduceArchiveService.syncArchivePosition():void");
    }

    public Map<String, Long> getArchivePosition() {
        HashMap hashMap = new HashMap();
        for (SendArchiveItem sendArchiveItem : this.itemList.getAll()) {
            hashMap.put(sendArchiveItem.getTopic(), Long.valueOf(remainMessagesSum(sendArchiveItem.getTopic())));
        }
        return hashMap;
    }

    public long remainMessagesSum() {
        return this.clusterManager.getTopics().stream().mapToLong(topicConfig -> {
            return remainMessagesSum(topicConfig.getName().getFullName());
        }).sum();
    }

    public long remainMessagesSum(String str) {
        return getCurrentIndexSum(str) - getArchiveIndexSum(str);
    }

    public long getCurrentIndexSum(String str) {
        return this.clusterManager.getMasterPartitionList(TopicName.parse(str)).stream().mapToLong(sh -> {
            return Math.max(0L, this.consume.getMaxIndex(new Consumer(str, IndexAndMetadata.NO_METADATA), sh.shortValue()) - 1);
        }).sum();
    }

    public long getArchiveIndexSum(String str) {
        return this.itemList.getAll().stream().filter(sendArchiveItem -> {
            return sendArchiveItem.topic.equals(str);
        }).mapToLong(sendArchiveItem2 -> {
            return sendArchiveItem2.getReadIndex();
        }).sum();
    }
}
