package io.continual.services.processor.library.onapmr.sinks;

import io.continual.builder.Builder;
import io.continual.onap.services.publisher.OnapMsgRouterBatchPublisher;
import io.continual.onap.services.publisher.OnapMsgRouterPublisher;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Sink;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonVisitor;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/library/onapmr/sinks/OnapMrSink.class */
public class OnapMrSink implements Sink {
    private final OnapMsgRouterBatchPublisher fPub;
    private static final Logger log = LoggerFactory.getLogger(OnapMrSink.class);

    public OnapMrSink(ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        try {
            final ExpressionEvaluator exprEval = configLoadContext.getServiceContainer().getExprEval(jSONObject);
            final OnapMsgRouterPublisher.Builder builder = OnapMsgRouterPublisher.builder();
            Object obj = jSONObject.get("hosts");
            if (obj instanceof JSONArray) {
                log.debug("MR hosts: {}", obj.toString());
                JsonVisitor.forEachElement((JSONArray) obj, new JsonVisitor.ArrayVisitor<String, JSONException>(this) { // from class: io.continual.services.processor.library.onapmr.sinks.OnapMrSink.1
                    final /* synthetic */ OnapMrSink this$0;

                    {
                        this.this$0 = this;
                    }

                    public boolean visit(String str) throws JSONException {
                        builder.withHost(exprEval.evaluateText(str));
                        return true;
                    }
                });
            } else {
                if (!(obj instanceof String)) {
                    throw new Builder.BuildFailure("hosts must be an array or a string");
                }
                log.debug("MR hosts (read): {}", obj);
                String evaluateText = exprEval.evaluateText((String) obj);
                log.debug("MR hosts (eval): {}", evaluateText);
                for (String str : evaluateText.split(",")) {
                    if (str != null && str.length() > 0) {
                        log.debug("adding MR host: {}", str);
                        builder.withHost(str);
                    }
                }
            }
            builder.onTopic(exprEval.evaluateText(jSONObject.getString("topic"))).usingProxy(exprEval.evaluateText(jSONObject.optString("proxy", null)));
            String optString = jSONObject.optString("user", null);
            String evaluateText2 = exprEval.evaluateText(optString == null ? jSONObject.optString("username", null) : optString);
            if (evaluateText2 != null && evaluateText2.length() > 0) {
                builder.asUser(evaluateText2, exprEval.evaluateText(jSONObject.optString("password", null)));
            }
            String evaluateText3 = exprEval.evaluateText(jSONObject.optString("apiKey", null));
            if (evaluateText3 != null && evaluateText3.length() > 0) {
                builder.withApiKey(evaluateText3, exprEval.evaluateText(jSONObject.optString("apiSecret", null)));
            }
            this.fPub = new OnapMsgRouterBatchPublisher.Builder().usingPublisher(builder.build()).batchAtMost(jSONObject.optInt("batchSizeAtMost", 1000)).batchMaxAgeMs(jSONObject.optInt("batchMaxAgeMs", 1000)).withMaxPendingCount(jSONObject.optInt("maxPendingCount", 100000), OnapMsgRouterBatchPublisher.DropPolicy.fromSettingString(jSONObject.optString("maxPendingDropPolicy"))).build();
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    public synchronized void init() {
        this.fPub.start();
    }

    public synchronized void flush() {
    }

    public synchronized void close() {
        log.warn("OnapMrSink closing...");
        this.fPub.close();
    }

    public synchronized void process(MessageProcessingContext messageProcessingContext) {
        this.fPub.send(new OnapMsgRouterPublisher.Message(messageProcessingContext.evalExpression("${eventStreamName}"), messageProcessingContext.getMessage().toJson().toString()));
    }
}
