package io.mantisrx.sourcejob.kafka;

import io.mantisrx.common.codec.Codec;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.mql.jvm.core.Query;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.sourcejob.kafka.core.TaggedData;
import io.mantisrx.sourcejob.kafka.sink.MQLQueryManager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/sourcejob/kafka/AbstractAckableTaggingStage.class */
public abstract class AbstractAckableTaggingStage implements ScalarComputation<KafkaAckable, TaggedData> {
    public static final String MANTIS_META_IS_COMPLETE_DATA = "mantis.meta.isCompleteData";
    public static final String MANTIS_META_SOURCE_NAME = "mantis.meta.sourceName";
    public static final String MANTIS_META_SOURCE_TIMESTAMP = "mantis.meta.timestamp";
    public static final String MANTIS_QUERY_COUNTER = "mantis_query_out";
    public static final String MQL_COUNTER = "mql_out";
    public static final String MQL_FAILURE = "mql_failure";
    public static final String MQL_CLASSLOADER_ERROR = "mql_classloader_error";
    private static final Logger logger = LoggerFactory.getLogger(AbstractAckableTaggingStage.class);
    private static final String MANTIS_META = "mantis.meta";
    protected AtomicBoolean trackIsComplete = new AtomicBoolean(false);
    private AtomicBoolean errorLogged = new AtomicBoolean(false);

    public Observable<TaggedData> call(Context context, Observable<KafkaAckable> observable) {
        context.getMetricsRegistry().registerAndGet(new Metrics.Builder().name("mql").addCounter(MQL_COUNTER).addCounter(MQL_FAILURE).addCounter(MQL_CLASSLOADER_ERROR).addCounter(MANTIS_QUERY_COUNTER).build());
        return observable.map(kafkaAckable -> {
            return preProcess(processAndAck(context, kafkaAckable));
        }).filter(map -> {
            return Boolean.valueOf(!map.isEmpty());
        }).map(map2 -> {
            return applyPreMapping(context, map2);
        }).filter(map3 -> {
            return Boolean.valueOf(!map3.isEmpty());
        }).flatMapIterable(map4 -> {
            return tagData(map4, context);
        });
    }

    protected abstract Map<String, Object> processAndAck(Context context, KafkaAckable kafkaAckable);

    protected abstract Map<String, Object> preProcess(Map<String, Object> map);

    protected Map<String, Object> applyPreMapping(Context context, Map<String, Object> map) {
        return map;
    }

    private boolean isMetaEvent(Map<String, Object> map) {
        return map.containsKey(MANTIS_META_IS_COMPLETE_DATA) || map.containsKey(MANTIS_META);
    }

    protected List<TaggedData> tagData(Map<String, Object> map, Context context) {
        ArrayList arrayList = new ArrayList();
        boolean isMetaEvent = isMetaEvent(map);
        Metrics metric = context.getMetricsRegistry().getMetric("mql");
        for (Query query : MQLQueryManager.getInstance().getRegisteredQueries()) {
            if (isMetaEvent) {
                try {
                    TaggedData taggedData = new TaggedData(map);
                    taggedData.addMatchedClient(query.getSubscriptionId());
                    arrayList.add(taggedData);
                } catch (Error e) {
                    metric.getCounter(MQL_FAILURE).increment();
                    if (!this.errorLogged.get()) {
                        logger.error("caught Error when processing MQL {} on {}", new Object[]{query.getRawQuery(), map.toString(), e});
                        this.errorLogged.set(true);
                    }
                } catch (Exception e2) {
                    if (e2 instanceof ClassNotFoundException) {
                        logger.error("Error loading MQL: " + e2.getMessage());
                        e2.printStackTrace();
                        metric.getCounter(MQL_CLASSLOADER_ERROR).increment();
                    } else {
                        e2.printStackTrace();
                        metric.getCounter(MQL_FAILURE).increment();
                        logger.error("MQL Error: " + e2.getMessage());
                        logger.error("MQL Query: " + query.getRawQuery());
                        logger.error("MQL Datum: " + map);
                    }
                }
            } else if (query.matches(map).booleanValue()) {
                Map project = query.project(map);
                project.put(MANTIS_META_SOURCE_NAME, map.get(MANTIS_META_SOURCE_NAME));
                project.put(MANTIS_META_SOURCE_TIMESTAMP, map.get(MANTIS_META_SOURCE_TIMESTAMP));
                TaggedData taggedData2 = new TaggedData(project);
                taggedData2.addMatchedClient(query.getSubscriptionId());
                arrayList.add(taggedData2);
            }
        }
        return arrayList;
    }

    public static Codec<TaggedData> taggedDataCodec() {
        return new Codec<TaggedData>() { // from class: io.mantisrx.sourcejob.kafka.AbstractAckableTaggingStage.1
            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public TaggedData m1decode(byte[] bArr) {
                return new TaggedData(new HashMap());
            }

            public byte[] encode(TaggedData taggedData) {
                return new byte[128];
            }
        };
    }
}
