package org.joyqueue.broker.consumer;

import com.alibaba.fastjson.JSON;
import com.jd.laf.extension.ExtensionManager;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.filter.FilterCallback;
import org.joyqueue.broker.consumer.filter.FilterPipeline;
import org.joyqueue.broker.consumer.filter.MessageFilter;
import org.joyqueue.domain.Consumer;
import org.joyqueue.domain.TopicName;
import org.joyqueue.event.EventType;
import org.joyqueue.event.MetaEvent;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.nsr.event.UpdateConsumerEvent;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.security.Hex;
import org.joyqueue.toolkit.security.Md5;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/joyqueue/broker/consumer/FilterMessageSupport.class */
public class FilterMessageSupport {
    private ClusterManager clusterManager;
    private final Logger logger = LoggerFactory.getLogger(FilterMessageSupport.class);
    private ConcurrentMap<String, FilterPipeline<MessageFilter>> filterRuleCache = new ConcurrentHashMap();

    /* loaded from: input_file:org/joyqueue/broker/consumer/FilterMessageSupport$updateConsumeListener.class */
    class updateConsumeListener implements EventListener<MetaEvent> {
        updateConsumeListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            if (metaEvent.getEventType() == EventType.UPDATE_CONSUMER) {
                UpdateConsumerEvent updateConsumerEvent = (UpdateConsumerEvent) metaEvent;
                FilterMessageSupport.this.logger.info("listen update consume event to update filter pipeline.");
                FilterMessageSupport.this.updateFilterRuleCache(updateConsumerEvent.getTopic(), updateConsumerEvent.getNewConsumer().getApp());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FilterMessageSupport(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        clusterManager.addListener(new updateConsumeListener());
    }

    public List<ByteBuffer> filter(Consumer consumer, List<ByteBuffer> list, FilterCallback filterCallback) throws JoyQueueException {
        FilterPipeline<MessageFilter> filterPipeline = this.filterRuleCache.get(consumer.getId());
        if (filterPipeline == null) {
            filterPipeline = createFilterPipeline(consumer.getConsumerPolicy());
            this.filterRuleCache.putIfAbsent(consumer.getId(), filterPipeline);
        }
        return filterPipeline.execute(list, filterCallback);
    }

    private FilterPipeline<MessageFilter> createFilterPipeline(Consumer.ConsumerPolicy consumerPolicy) throws JoyQueueException {
        Map<String, String> filters = consumerPolicy.getFilters();
        FilterPipeline<MessageFilter> filterPipeline = new FilterPipeline<>(generatePipelineId(filters));
        if (MapUtils.isNotEmpty(filters)) {
            for (Map.Entry<String, String> entry : filters.entrySet()) {
                String key = entry.getKey();
                String value = entry.getValue();
                MessageFilter messageFilter = (MessageFilter) ExtensionManager.getOrLoadExtension(MessageFilter.class, key);
                messageFilter.setRule(value);
                filterPipeline.register(messageFilter);
            }
        }
        return filterPipeline;
    }

    private String generatePipelineId(Map<String, String> map) throws JoyQueueException {
        if (MapUtils.isEmpty(map)) {
            return null;
        }
        try {
            return Hex.encode(Md5.INSTANCE.encrypt(JSON.toJSONString(map).getBytes("utf-8"), (byte[]) null));
        } catch (Exception e) {
            this.logger.error("generate filter pipeline error.", e);
            throw new JoyQueueException(e, JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateFilterRuleCache(TopicName topicName, String str) {
        try {
            Consumer.ConsumerPolicy consumerPolicy = this.clusterManager.getConsumerPolicy(topicName, str);
            Map<String, String> filters = consumerPolicy.getFilters();
            String generatePipelineId = generatePipelineId(filters);
            FilterPipeline<MessageFilter> filterPipeline = this.filterRuleCache.get(getConsumeId(topicName, str));
            if (filterPipeline == null || !StringUtils.equals(generatePipelineId, filterPipeline.getId())) {
                this.filterRuleCache.put(getConsumeId(topicName, str), createFilterPipeline(consumerPolicy));
            } else {
                this.logger.info("FilterPipeline is already exist, topic:[{}], app:[{}], filers:[{}]", new Object[]{topicName, str, JSON.toJSON(filters)});
            }
        } catch (Exception e) {
            this.logger.error("Update Message filter cache error.", e);
        }
    }

    private String getConsumeId(TopicName topicName, String str) {
        return new StringBuilder(30).append(topicName.getFullName()).append(".").append(str).toString();
    }
}
