package io.mantisrx.mantis.examples.sinefunction;

import io.mantisrx.mantis.examples.sinefunction.core.Point;
import io.mantisrx.mantis.examples.sinefunction.stages.SinePointGeneratorStage;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.type.BooleanParameter;
import io.mantisrx.runtime.parameter.type.DoubleParameter;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/mantis/examples/sinefunction/SineFunctionJob.class */
public class SineFunctionJob extends MantisJobProvider<Point> {
    public static final String INTERVAL_SEC = "intervalSec";
    public static final String RANGE_MAX = "max";
    public static final String RANGE_MIN = "min";
    public static final String AMPLITUDE = "amplitude";
    public static final String FREQUENCY = "frequency";
    public static final String PHASE = "phase";
    public static final String RANDOM_RATE = "randomRate";
    public static final String USE_RANDOM_FLAG = "useRandom";
    public static final SelfDocumentingSink<Point> sseSink = new ServerSentEventsSink.Builder().withEncoder(point -> {
        return String.format("{\"x\": %f, \"y\": %f}", Double.valueOf(point.getX()), Double.valueOf(point.getY()));
    }).withPredicate(new Predicate("filter=even, returns even x parameters; filter=odd, returns odd x parameters.", map -> {
        Func1 func1 = point2 -> {
            return true;
        };
        if (map != null && map.containsKey("filter")) {
            String str = (String) ((List) map.get("filter")).get(0);
            func1 = point3 -> {
                if ("even".equalsIgnoreCase(str)) {
                    return Boolean.valueOf(point3.getX() % 2.0d == 0.0d);
                }
                if ("odd".equalsIgnoreCase(str)) {
                    return Boolean.valueOf(point3.getX() % 2.0d != 0.0d);
                }
                return true;
            };
        }
        return func1;
    })).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/mantis/examples/sinefunction/SineFunctionJob$TimerSource.class */
    public static class TimerSource implements Source<Integer> {
        public Observable<Observable<Integer>> call(Context context, Index index) {
            index.getTotalNumWorkersObservable().subscribeOn(Schedulers.io()).subscribe(num -> {
                System.out.println("Total worker count changed to -> " + num);
            });
            int intValue = ((Integer) context.getParameters().get(SineFunctionJob.INTERVAL_SEC)).intValue();
            int intValue2 = ((Integer) context.getParameters().get(SineFunctionJob.RANGE_MAX)).intValue();
            int intValue3 = ((Integer) context.getParameters().get(SineFunctionJob.RANGE_MIN)).intValue();
            double doubleValue = ((Double) context.getParameters().get(SineFunctionJob.RANDOM_RATE)).doubleValue();
            boolean booleanValue = ((Boolean) context.getParameters().get(SineFunctionJob.USE_RANDOM_FLAG)).booleanValue();
            Random random = new Random();
            Random random2 = new Random();
            return Observable.just(Observable.interval(0L, intValue, TimeUnit.SECONDS).map(l -> {
                return booleanValue ? Integer.valueOf(random.nextInt((intValue2 - intValue3) + 1) + intValue3) : Integer.valueOf((int) l.longValue());
            }).filter(num2 -> {
                return Boolean.valueOf(random2.nextDouble() <= doubleValue);
            }));
        }

        public void close() throws IOException {
        }
    }

    static ScalarToScalar.Config<Integer, Point> stageConfig() {
        return new ScalarToScalar.Config().codec(JacksonCodecs.pojo(Point.class));
    }

    public static void main(String[] strArr) {
        LocalJobExecutorNetworked.execute(new SineFunctionJob().getJobInstance(), new Parameter[]{new Parameter(USE_RANDOM_FLAG, "false")});
    }

    public Job<Point> getJobInstance() {
        return MantisJob.source(new TimerSource()).stage(new SinePointGeneratorStage(), stageConfig()).sink(sseSink).parameterDefinition(new BooleanParameter().name(USE_RANDOM_FLAG).required().description("If true, produce a random sequence of integers.  If false, produce a sequence of integers starting at 0 and increasing by 1.").build()).parameterDefinition(new DoubleParameter().name(RANDOM_RATE).defaultValue(Double.valueOf(1.0d)).description("The chance a random integer is generated, for the given period").validator(Validators.range(0, 1)).build()).parameterDefinition(new IntParameter().name(INTERVAL_SEC).defaultValue(1).description("Period at which to generate a random integer value to send to sine function").validator(Validators.range(1, 60)).build()).parameterDefinition(new IntParameter().name(RANGE_MIN).defaultValue(0).description("Minimun of random integer value").validator(Validators.range(0, 100)).build()).parameterDefinition(new IntParameter().name(RANGE_MAX).defaultValue(100).description("Maximum of random integer value").validator(Validators.range(1, 100)).build()).parameterDefinition(new DoubleParameter().name(AMPLITUDE).defaultValue(Double.valueOf(10.0d)).description("Amplitude for sine function").validator(Validators.range(1, 100)).build()).parameterDefinition(new DoubleParameter().name(FREQUENCY).defaultValue(Double.valueOf(1.0d)).description("Frequency for sine function").validator(Validators.range(1, 100)).build()).parameterDefinition(new DoubleParameter().name(PHASE).defaultValue(Double.valueOf(0.0d)).description("Phase for sine function").validator(Validators.range(0, 100)).build()).metadata(new Metadata.Builder().name("Sine function").description("Produces an infinite stream of points, along the sine function, using the following function definition: f(x) = amplitude * sin(frequency * x + phase). The input to the function is either random between [min, max], or an integer sequence starting  at 0.  The output is served via HTTP server using SSE protocol.").build()).create();
    }
}
