package io.mantisrx.sourcejob.kafka;

import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import io.mantisrx.sourcejob.kafka.core.TaggedData;
import io.mantisrx.sourcejob.kafka.core.utils.JsonUtility;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Func1;

/* loaded from: input_file:io/mantisrx/sourcejob/kafka/CustomizedAutoAckTaggingStage.class */
public class CustomizedAutoAckTaggingStage extends AutoAckTaggingStage {
    private static Logger logger = LoggerFactory.getLogger(CustomizedAutoAckTaggingStage.class);
    private String jobName;
    private String timestampField = "_ts_";
    private AtomicLong latestTimeStamp = new AtomicLong();
    private boolean isFlattenFields = false;
    private List<String> fieldsToFlatten = new ArrayList();
    private Func1<Map<String, Object>, Map<String, Object>> preMapperFunc = map -> {
        return map;
    };

    public void init(Context context) {
        super.init(context);
        this.jobName = context.getWorkerInfo().getJobName();
        String str = System.getenv("JOB_PARAM_timeStampField");
        if (str != null && !str.isEmpty()) {
            this.timestampField = str;
        }
        String str2 = System.getenv("JOB_PARAM_fieldsToFlatten");
        if (str2 == null || str2.isEmpty() || str2.equals("NONE")) {
            return;
        }
        String[] split = str2.split(",");
        if (split.length > 0) {
            this.isFlattenFields = true;
            for (String str3 : split) {
                this.fieldsToFlatten.add(str3.trim());
            }
            logger.info("Field flattening enabled for fields {}", this.fieldsToFlatten);
        }
    }

    private void flattenFields(Map<String, Object> map) {
        Iterator<String> it = this.fieldsToFlatten.iterator();
        while (it.hasNext()) {
            flattenField(map, it.next());
        }
    }

    private void flattenField(Map<String, Object> map, String str) {
        try {
            for (Map.Entry<String, Object> entry : JsonUtility.jsonToMap((String) map.get(str)).entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                if (key != null && value != null) {
                    map.put(str + "." + key, value);
                }
            }
        } catch (Exception e) {
            logger.warn("Error flattening field " + str + " error -> " + e.getMessage());
        }
    }

    @Override // io.mantisrx.sourcejob.kafka.AbstractAckableTaggingStage
    protected Map<String, Object> applyPreMapping(Context context, Map<String, Object> map) {
        if (map == null) {
            throw new RuntimeException("rawData is null");
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (map.containsKey(this.timestampField)) {
            long longValue = ((Long) map.get(this.timestampField)).longValue();
            long j = this.latestTimeStamp.get();
            if (longValue > j) {
                this.latestTimeStamp.compareAndSet(j, longValue);
            }
        }
        try {
            this.preMapperFunc.call(map);
        } catch (Exception e) {
            logger.warn("Exception applying premapping function " + e.getMessage());
        }
        HashMap hashMap = new HashMap(map);
        hashMap.put(AbstractAckableTaggingStage.MANTIS_META_SOURCE_NAME, this.jobName);
        hashMap.put(AbstractAckableTaggingStage.MANTIS_META_SOURCE_TIMESTAMP, Long.valueOf(currentTimeMillis));
        if (this.isFlattenFields) {
            flattenFields(hashMap);
        }
        return hashMap;
    }

    public static ScalarToScalar.Config<KafkaAckable, TaggedData> config() {
        ScalarToScalar.Config<KafkaAckable, TaggedData> withParameters = new ScalarToScalar.Config().concurrentInput().codec(AutoAckTaggingStage.taggedDataCodec()).withParameters(getParameters());
        String str = "JOB_PARAM_mantis.stageConcurrency";
        String str2 = System.getenv(str);
        if (str2 != null && !str2.isEmpty()) {
            logger.info("Job param: " + str + " value: " + str2);
            try {
                withParameters = withParameters.concurrentInput(Integer.parseInt(str2));
            } catch (NumberFormatException e) {
            }
        }
        return withParameters;
    }

    static List<ParameterDefinition<?>> getParameters() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new StringParameter().name("fieldsToFlatten").description("comma separated list of fields to flatten").validator(Validators.notNullOrEmpty()).defaultValue("NONE").build());
        newArrayList.add(new StringParameter().name("timeStampField").description("the timestamp field in the event. used to calculate lag").validator(Validators.notNullOrEmpty()).defaultValue("_ts_").build());
        newArrayList.add(new IntParameter().name("mantis.stageConcurrency").description("Parameter to control number of computation workers to use for stage processing").defaultValue(1).validator(Validators.range(1, 8)).build());
        return newArrayList;
    }
}
