package io.continual.services.processor.engine.library.util;

import io.continual.builder.Builder;
import io.continual.metrics.MetricsCatalog;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.engine.runtime.SerialNumberGenerator;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonEval;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;

/* loaded from: input_file:io/continual/services/processor/engine/library/util/SimpleMessageProcessingContext.class */
public class SimpleMessageProcessingContext implements MessageProcessingContext {
    private final StreamProcessingContext fSpc;
    private final String fId;
    private final Message fMsg;
    private final ExprDataSource fEvalStack;
    private final Program fProgram;
    private boolean fHaltRequested;

    /* loaded from: input_file:io/continual/services/processor/engine/library/util/SimpleMessageProcessingContext$Builder.class */
    public static class Builder {
        private StreamProcessingContext fStreamProcContext = null;
        private SerialNumberGenerator fSng = new SerialNumberGenerator();
        private ExprDataSource fEvalStack = new ExprDataSourceStack(new ExprDataSource[0]);
        private Program fSrcSinkProg = new Program();

        public SimpleMessageProcessingContext build(Message message) throws Builder.BuildFailure {
            return new SimpleMessageProcessingContext(this, message);
        }

        public Builder usingContext(StreamProcessingContext streamProcessingContext) {
            this.fStreamProcContext = streamProcessingContext;
            return this;
        }

        public Builder serialNumbersFrom(SerialNumberGenerator serialNumberGenerator) {
            this.fSng = serialNumberGenerator;
            return this;
        }

        public Builder evaluatingAgainst(ExprDataSource exprDataSource) {
            this.fEvalStack = exprDataSource;
            return this;
        }

        public Builder sourcesAndSinksFrom(Program program) {
            this.fSrcSinkProg = program;
            return this;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public StreamProcessingContext getStreamProcessingContext() {
        return this.fSpc;
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public Message getMessage() {
        return this.fMsg;
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public String getId() {
        return this.fId;
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public Source getSource(String str) {
        return this.fProgram.getSources().get(str);
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public Sink getSink(String str) {
        return this.fProgram.getSinks().get(str);
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public boolean shouldContinue() {
        return (this.fHaltRequested || this.fSpc.failed()) ? false : true;
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public void stopProcessing() {
        this.fHaltRequested = true;
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public void stopProcessing(String str) {
        this.fHaltRequested = true;
        warn(str);
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public void warn(String str) {
        this.fSpc.warn("msg #" + this.fId + ": " + str);
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [T, java.lang.String] */
    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public <T> T evalExpression(String str, Class<T> cls, ExprDataSource... exprDataSourceArr) {
        ?? r0 = (T) ExpressionEvaluator.evaluateText(str, new ExprDataSource[]{new ExprDataSourceStack(exprDataSourceArr), new ExprDataSource() { // from class: io.continual.services.processor.engine.library.util.SimpleMessageProcessingContext.1
            public Object eval(String str2) {
                return JsonEval.eval(SimpleMessageProcessingContext.this.fMsg.accessRawJson(), str2);
            }
        }, this.fEvalStack});
        if (cls.equals(String.class)) {
            return r0;
        }
        if (cls.equals(Long.class)) {
            return (T) new Long(Long.parseLong(r0));
        }
        if (cls.equals(Integer.class)) {
            return (T) new Integer(Integer.parseInt(r0));
        }
        if (cls.equals(Double.class)) {
            return (T) new Double(Double.parseDouble(r0));
        }
        if (cls.equals(JSONArray.class)) {
            return (T) new JSONArray(new JSONTokener(r0));
        }
        if (cls.equals(JSONObject.class)) {
            return (T) new JSONObject(new JSONTokener(r0));
        }
        throw new IllegalArgumentException("Can't eval to " + cls.getName());
    }

    @Override // io.continual.services.processor.engine.model.MessageProcessingContext
    public MetricsCatalog getMetrics() {
        return this.fSpc.getMetrics().getSubCatalog("messageProcessing");
    }

    private SimpleMessageProcessingContext(Builder builder, Message message) throws Builder.BuildFailure {
        this.fHaltRequested = false;
        this.fSpc = builder.fStreamProcContext;
        this.fMsg = message;
        this.fId = builder.fSng.getNext();
        this.fEvalStack = builder.fEvalStack;
        this.fProgram = builder.fSrcSinkProg;
        if (this.fSpc == null) {
            throw new Builder.BuildFailure("No stream processing context in message processing context.");
        }
    }
}
