package io.continual.services.processor.engine.library.sources;

import io.continual.builder.Builder;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.library.services.bucketing.BucketingService;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.JsonUtil;
import io.continual.util.time.Clock;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:io/continual/services/processor/engine/library/sources/MessageGenerator.class */
public class MessageGenerator extends QueuingSource {
    private final ExpressionEvaluator fExprEval;
    private final JSONObject fMessage;
    private long fCount;
    private final long fPauseMs;
    private long fNextMs;
    private long fSerialNumber;

    public MessageGenerator(JSONObject jSONObject) throws Builder.BuildFailure {
        this(null, jSONObject);
    }

    public MessageGenerator(ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        super(jSONObject);
        this.fSerialNumber = 0L;
        try {
            this.fExprEval = jSONObject.optBoolean("skipEvals", false) ? null : configLoadContext.getServiceContainer().getExprEval();
            JSONObject optJSONObject = jSONObject.optJSONObject("message");
            this.fMessage = optJSONObject == null ? new JSONObject() : optJSONObject;
            this.fCount = jSONObject.optLong(BucketingService.kCount, -1L);
            this.fPauseMs = jSONObject.optLong("everyMs", 1000L);
            if (this.fPauseMs < 1) {
                throw new Builder.BuildFailure("'everyMs' interval must be at least 1 ms");
            }
            this.fNextMs = Clock.now() + jSONObject.optLong("initialPauseMs", this.fCount == 1 ? 0L : this.fPauseMs);
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    @Override // io.continual.services.processor.engine.library.sources.QueuingSource
    protected List<MessageAndRouting> reload() {
        ArrayList arrayList = new ArrayList();
        if (Clock.now() >= this.fNextMs) {
            JSONObject clone = JsonUtil.clone(this.fMessage);
            long j = this.fSerialNumber + 1;
            this.fSerialNumber = j;
            JSONObject put = clone.put("serialNumber", j);
            if (this.fExprEval != null) {
                put = this.fExprEval.evaluateJsonObject(put);
            }
            arrayList.add(makeDefRoutingMessage(Message.adoptJsonAsMessage(put)));
            this.fNextMs += this.fPauseMs;
        }
        if (this.fPauseMs <= 0) {
            noteEndOfStream();
            this.fNextMs = Long.MAX_VALUE;
        }
        return arrayList;
    }
}
