package org.joyqueue.service.impl;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.extension.ExtensionHeader;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.joyqueue.consumer.ExtensionConsumer;
import io.openmessaging.message.Message;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.domain.TopicName;
import org.joyqueue.model.PageResult;
import org.joyqueue.model.QPageQuery;
import org.joyqueue.model.domain.ApplicationToken;
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.domain.Identity;
import org.joyqueue.model.domain.Namespace;
import org.joyqueue.model.domain.Subscribe;
import org.joyqueue.model.domain.Topic;
import org.joyqueue.model.domain.TopicMsgFilter;
import org.joyqueue.model.domain.User;
import org.joyqueue.model.exception.NotFoundException;
import org.joyqueue.model.query.QTopicMsgFilter;
import org.joyqueue.monitor.PartitionAckMonitorInfo;
import org.joyqueue.msg.filter.FilterResponse;
import org.joyqueue.msg.filter.OutputType;
import org.joyqueue.msg.filter.support.TopicMessageFilterSupport;
import org.joyqueue.repository.TopicMsgFilterRepository;
import org.joyqueue.service.ApplicationTokenService;
import org.joyqueue.service.BrokerService;
import org.joyqueue.service.ConsumeOffsetService;
import org.joyqueue.service.ConsumerService;
import org.joyqueue.service.MessagePreviewService;
import org.joyqueue.service.TopicMsgFilterService;
import org.joyqueue.service.UserService;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("topicMsgFilterService")
/* loaded from: input_file:org/joyqueue/service/impl/TopicMsgFilterServiceImpl.class */
public class TopicMsgFilterServiceImpl extends PageServiceSupport<TopicMsgFilter, QTopicMsgFilter, TopicMsgFilterRepository> implements TopicMsgFilterService {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgFilterServiceImpl.class);

    @Autowired
    private ApplicationTokenService applicationTokenService;

    @Autowired
    private ConsumerService consumerService;

    @Autowired
    private ConsumeOffsetService consumeOffsetService;

    @Autowired
    private BrokerService brokerService;

    @Autowired
    private UserService userService;

    @Autowired
    private MessagePreviewService messagePreviewService;
    private final int maxPoolSize = 3;
    private final int appendMaxSize = 100;
    private final String urlFormat = "oms:joyqueue://%s@%s/default";
    private final String filePathFormat = "%s_%s_%s_%s_%s.txt";
    private final long defaultTimeOut = 60000;
    private final long defaultFilterTimeOut = 10000;
    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setNameFormat("topic-msg-filter-%d").setDaemon(true).build());
    private final TopicMessageFilterSupport topicMessageFilterSupport = new TopicMessageFilterSupport();

    @Override // org.joyqueue.service.TopicMsgFilterService
    public void execute() throws Exception {
        int activeCount = 3 - this.threadPoolExecutor.getActiveCount();
        if (activeCount <= 0) {
            throw new IllegalAccessException("message filter task queue is full, please waiting until one of them finished");
        }
        List<TopicMsgFilter> findByNextOne = this.repository.findByNextOne(activeCount);
        if (!CollectionUtils.isNotEmpty(findByNextOne)) {
            throw new IllegalAccessException("message filter queue has no task to execute");
        }
        for (TopicMsgFilter topicMsgFilter : findByNextOne) {
            try {
                execute(topicMsgFilter);
            } catch (NullPointerException | NotFoundException e) {
                updateMsgFilterStatus(topicMsgFilter, -2, "", e.getMessage());
                logger.error("topic message filter execute error: {}", e.getMessage());
            } catch (Exception e2) {
                updateMsgFilterStatus(topicMsgFilter, 0, "", "");
                logger.error("topic message filter execute error: {}", e2.getMessage());
            }
        }
    }

    private void execute(TopicMsgFilter topicMsgFilter) throws Exception {
        try {
            TopicName parse = TopicName.parse(topicMsgFilter.getTopic());
            List list = (List) this.consumerService.findByTopic(parse.getCode(), parse.getNamespace()).stream().map(consumer -> {
                return consumer.getApp().getCode();
            }).collect(Collectors.toList());
            if (!CollectionUtils.isNotEmpty(list)) {
                updateMsgFilterStatus(topicMsgFilter, -2, "", "topic not found or has unrelated app");
                throw new NotFoundException("topic not found or has unrelated app");
            }
            updateMsgFilterStatus(topicMsgFilter, 1, null, "");
            topicMsgFilter.setApp((String) list.get(0));
            List<ApplicationToken> findByApp = this.applicationTokenService.findByApp(topicMsgFilter.getApp());
            if (CollectionUtils.isNotEmpty(findByApp)) {
                topicMsgFilter.setToken(findByApp.get(0).getToken());
            } else {
                topicMsgFilter.setToken(createToken((String) list.get(0)));
            }
            List<Broker> findByTopic = this.brokerService.findByTopic(topicMsgFilter.getTopic());
            if (CollectionUtils.isNotEmpty(findByTopic)) {
                Broker broker = findByTopic.get(0);
                topicMsgFilter.setBrokerAddr(broker.getIp() + ":" + broker.getPort());
            }
            CompletableFuture.supplyAsync(() -> {
                try {
                    return consume(topicMsgFilter);
                } catch (Exception e) {
                    logger.error("Message filter cause error", e);
                    updateMsgFilterStatus(topicMsgFilter, -2, "", e.getMessage());
                    return null;
                }
            }, this.threadPoolExecutor).whenComplete((topicMsgFilter2, th) -> {
                if (topicMsgFilter2 != null) {
                    handleFilterOutputs(this.topicMessageFilterSupport.output(topicMsgFilter2.getUrl()), topicMsgFilter2);
                    try {
                        Files.deleteIfExists(Paths.get(topicMsgFilter2.getUrl(), new String[0]));
                        logger.info("Delete file: {} success", topicMsgFilter2.getUrl());
                    } catch (IOException e) {
                        logger.error("Failed to delete file: {}, need to delete manually", topicMsgFilter2.getUrl());
                    }
                }
            });
        } catch (NullPointerException e) {
            throw new NotFoundException("topic not found or has unrelated app");
        }
    }

    private File createFile(String str) {
        File file = new File(str);
        if (file.exists()) {
            try {
                Files.delete(Paths.get(str, new String[0]));
            } catch (IOException e) {
                logger.error("Message filter file exists and failed to delete: {}", e.getMessage());
            }
        }
        try {
            if (!file.createNewFile()) {
                return null;
            }
            logger.info("Create file named [{}]", file.getAbsolutePath());
            return file;
        } catch (IOException e2) {
            logger.error("Error creating message filter file: {}", e2.getMessage());
            return null;
        }
    }

    private int appendFile(File file, StringBuilder sb) throws IOException {
        if (file == null || sb.length() <= 0) {
            return 0;
        }
        FileUtils.writeStringToFile(file, sb.toString(), StandardCharsets.UTF_8, true);
        sb.delete(0, sb.length());
        return 0;
    }

    private String buildFileHeader(TopicMsgFilter topicMsgFilter) {
        StringBuilder sb = new StringBuilder();
        sb.append("Keyword: ").append(topicMsgFilter.getFilter()).append('\n');
        sb.append("Topic: ").append(topicMsgFilter.getTopic()).append('\n');
        sb.append("MessageFormat: ").append(topicMsgFilter.getMsgFormat()).append('\n');
        if (topicMsgFilter.getPartition() == null || topicMsgFilter.getPartition().intValue() >= 0) {
            sb.append("Partition: ").append(topicMsgFilter.getPartition()).append('\n');
        } else {
            sb.append("Partition: ").append("all partition").append('\n');
        }
        if (topicMsgFilter.getOffsetStartTime() != null) {
            sb.append("OffsetStartTime: ").append(topicMsgFilter.getOffsetStartTime()).append('\n');
            sb.append("OffsetEndTime: ").append(topicMsgFilter.getOffsetEndTime()).append('\n');
        } else {
            sb.append("Offset: ").append(topicMsgFilter.getOffset()).append('\n');
            sb.append("QueryCount: ").append(topicMsgFilter.getQueryCount()).append('\n');
        }
        sb.append("UserId: ").append(topicMsgFilter.getCreateBy().getId()).append('\n');
        sb.append("UserCode: ").append(topicMsgFilter.getCreateBy().getCode()).append('\n');
        sb.append("CreateTime: ").append(topicMsgFilter.getCreateTime()).append('\n');
        sb.append("---------------------------------------------------------------------").append('\n');
        return sb.toString();
    }

    private String createToken(String str) throws Exception {
        ApplicationToken applicationToken = new ApplicationToken();
        applicationToken.setEffectiveTime(new Date());
        Calendar calendar = Calendar.getInstance();
        calendar.add(6, 7);
        applicationToken.setApplication(new Identity(str));
        applicationToken.setExpirationTime(calendar.getTime());
        applicationToken.setToken(UUID.randomUUID().toString().replaceAll("-", ""));
        this.applicationTokenService.add(applicationToken);
        return applicationToken.getToken();
    }

    private TopicMsgFilter consume(TopicMsgFilter topicMsgFilter) throws IOException {
        String format = String.format("oms:joyqueue://%s@%s/default", topicMsgFilter.getApp(), topicMsgFilter.getBrokerAddr());
        KeyValue newKeyValue = OMS.newKeyValue();
        newKeyValue.put("ACCOUNT_KEY", topicMsgFilter.getToken());
        newKeyValue.put("TRANSPORT_IO_THREADS", 1);
        ExtensionConsumer extensionConsumer = (ExtensionConsumer) OMS.getMessagingAccessPoint(format, newKeyValue).createConsumer();
        extensionConsumer.bindQueue(topicMsgFilter.getTopic());
        extensionConsumer.start();
        User user = (User) this.userService.findById(topicMsgFilter.getCreateBy().getId().longValue());
        topicMsgFilter.getCreateBy().setCode(user.getCode());
        String format2 = String.format("%s_%s_%s_%s_%s.txt", user.getCode(), topicMsgFilter.getCreateBy().getId(), topicMsgFilter.getTopic(), Long.valueOf(Thread.currentThread().getId()), Long.valueOf(SystemClock.now()));
        File createFile = createFile(format2);
        if (createFile != null) {
            FileUtils.writeStringToFile(createFile, buildFileHeader(topicMsgFilter), StandardCharsets.UTF_8, true);
        }
        long now = SystemClock.now();
        long j = now;
        int i = 0;
        int i2 = 0;
        StringBuilder sb = new StringBuilder();
        List<QueueMetaData.Partition> partitions = partitions(extensionConsumer, topicMsgFilter);
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        if (topicMsgFilter.getOffsetStartTime() == null || topicMsgFilter.getOffsetEndTime() == null) {
            Iterator<QueueMetaData.Partition> it = partitions.iterator();
            while (it.hasNext()) {
                newHashMap.put(Integer.valueOf(it.next().partitionId()), Long.valueOf(topicMsgFilter.getOffset()));
            }
        } else {
            parseOffsetByTs(topicMsgFilter.getApp(), topicMsgFilter.getTopic(), partitions.size() == 1 ? partitions.get(0).partitionId() : -1, topicMsgFilter.getOffsetStartTime().getTime(), topicMsgFilter.getOffsetEndTime().getTime(), newHashMap, newHashMap2);
        }
        while (true) {
            if (!consumerCondition(topicMsgFilter, i2, newHashMap2)) {
                break;
            }
            for (QueueMetaData.Partition partition : partitions) {
                if (topicMsgFilter.getOffsetStartTime() == null || newHashMap2.size() <= 0 || newHashMap2.containsKey(Integer.valueOf(partition.partitionId()))) {
                    for (Message message : extensionConsumer.batchReceive((short) partition.partitionId(), newHashMap.computeIfAbsent(Integer.valueOf(partition.partitionId()), num -> {
                        return 0L;
                    }).longValue(), 10000L)) {
                        now = SystemClock.now();
                        String preview = this.messagePreviewService.preview(topicMsgFilter.getMsgFormat(), message.getData());
                        if (this.topicMessageFilterSupport.match(preview, topicMsgFilter.getFilter())) {
                            j = SystemClock.now();
                            sb.append(preview).append('\n');
                            i++;
                            i2++;
                            if (i >= 100) {
                                i = appendFile(createFile, sb);
                            }
                        }
                        if (message.extensionHeader().isPresent()) {
                            newHashMap.put(Integer.valueOf(partition.partitionId()), Long.valueOf(((ExtensionHeader) message.extensionHeader().get()).getOffset() + 1));
                            if (newHashMap2.size() > 0 && newHashMap2.getOrDefault(Integer.valueOf(partition.partitionId()), -2L).longValue() >= 0 && newHashMap.get(Integer.valueOf(partition.partitionId())).longValue() >= newHashMap2.get(Integer.valueOf(partition.partitionId())).longValue()) {
                                newHashMap2.remove(Integer.valueOf(partition.partitionId()));
                            }
                        }
                    }
                }
            }
            if (SystemClock.now() - j >= 10000) {
                i = appendFile(createFile, sb);
                j = SystemClock.now();
            }
            if (SystemClock.now() - now >= 60000) {
                appendFile(createFile, sb);
                break;
            }
        }
        if (createFile != null && sb.length() > 0) {
            FileUtils.writeStringToFile(createFile, sb.toString(), StandardCharsets.UTF_8, true);
        }
        extensionConsumer.stop();
        updateMsgFilterStatus(topicMsgFilter, -1, "", "");
        topicMsgFilter.setUrl(format2);
        logger.info("message filter consume finished");
        return topicMsgFilter;
    }

    private boolean consumerCondition(TopicMsgFilter topicMsgFilter, int i, Map<Integer, Long> map) {
        return topicMsgFilter.getOffsetStartTime() != null ? map.size() > 0 : topicMsgFilter.getQueryCount() > i;
    }

    private void updateMsgFilterStatus(TopicMsgFilter topicMsgFilter, int i, String str, String str2) {
        topicMsgFilter.setStatus(i);
        if (StringUtils.isNoneBlank(new CharSequence[]{str})) {
            topicMsgFilter.setUrl(str);
        }
        if (StringUtils.isNoneBlank(new CharSequence[]{str2})) {
            topicMsgFilter.setDescription(str2);
        }
        topicMsgFilter.setUpdateTime(new Date(SystemClock.now()));
        this.repository.update(topicMsgFilter);
    }

    private void parseOffsetByTs(String str, String str2, int i, long j, long j2, Map<Integer, Long> map, Map<Integer, Long> map2) {
        Subscribe subscribe = new Subscribe();
        subscribe.setApp(new Identity(str));
        TopicName parse = TopicName.parse(str2);
        Topic topic = new Topic(parse.getCode());
        Namespace namespace = new Namespace(parse.getNamespace());
        subscribe.setTopic(topic);
        subscribe.setNamespace(namespace);
        subscribe.setType("消费者");
        parseOffset(subscribe, i, j, map);
        parseOffset(subscribe, i, j2, map2);
    }

    private void parseOffset(Subscribe subscribe, int i, long j, Map<Integer, Long> map) {
        List<PartitionAckMonitorInfo> timeOffset = this.consumeOffsetService.timeOffset(subscribe, j);
        if (i < 0) {
            for (PartitionAckMonitorInfo partitionAckMonitorInfo : timeOffset) {
                map.put(Integer.valueOf(partitionAckMonitorInfo.getPartition()), Long.valueOf(partitionAckMonitorInfo.getIndex()));
            }
            return;
        }
        for (PartitionAckMonitorInfo partitionAckMonitorInfo2 : timeOffset) {
            if (i == partitionAckMonitorInfo2.getPartition()) {
                map.put(Integer.valueOf(i), Long.valueOf(partitionAckMonitorInfo2.getIndex()));
                return;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.util.List] */
    private List<QueueMetaData.Partition> partitions(ExtensionConsumer extensionConsumer, TopicMsgFilter topicMsgFilter) {
        QueueMetaData queueMetaData = extensionConsumer.getQueueMetaData(topicMsgFilter.getTopic());
        ArrayList arrayList = new ArrayList(1);
        if (topicMsgFilter.getPartition() != null && topicMsgFilter.getPartition().intValue() >= 0) {
            Iterator it = queueMetaData.partitions().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                QueueMetaData.Partition partition = (QueueMetaData.Partition) it.next();
                if (partition.partitionId() == topicMsgFilter.getPartition().intValue()) {
                    arrayList.add(partition);
                    break;
                }
            }
        } else {
            arrayList = queueMetaData.partitions();
        }
        return arrayList;
    }

    public void handleFilterOutputs(List<FilterResponse> list, TopicMsgFilter topicMsgFilter) {
        for (FilterResponse filterResponse : list) {
            if (filterResponse.getType().equals(OutputType.S3)) {
                try {
                    TopicMsgFilter topicMsgFilter2 = new TopicMsgFilter();
                    topicMsgFilter2.setUrl(filterResponse.getData().toString());
                    topicMsgFilter2.setId(topicMsgFilter.getId());
                    topicMsgFilter2.setObjectKey(topicMsgFilter2.getUrl().substring(topicMsgFilter2.getUrl().lastIndexOf(47) + 1));
                    topicMsgFilter2.setStatus(topicMsgFilter.getStatus());
                    this.repository.update(topicMsgFilter2);
                } catch (Exception e) {
                    logger.error("Failed to update url", e);
                }
            }
        }
    }

    @Override // org.joyqueue.service.TopicMsgFilterService
    public PageResult<TopicMsgFilter> findTopicMsgFilters(QPageQuery<QTopicMsgFilter> qPageQuery) {
        return this.repository.findTopicMsgFiltersByQuery(qPageQuery);
    }
}
