package io.mantisrx.runtime.executor;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.rx.MonitorOperator;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.Groups;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.SourceHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.markers.MantisMarker;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.mantisrx.runtime.scheduler.MantisRxSingleThreadScheduler;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivx.mantis.operators.GroupedObservableUtils;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func2;
import rx.internal.util.RxThreadFactory;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/runtime/executor/StageExecutors.class */
public class StageExecutors {
    private static final Logger logger = LoggerFactory.getLogger(StageExecutors.class);
    private static Counter groupsExpiredCounter = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().name("StageExecutors").addCounter("groupsExpiredCounter").build()).getCounter("groupsExpiredCounter");
    private static long stageBufferIntervalMs;
    private static int maxItemsInBuffer;

    private StageExecutors() {
    }

    public static void executeSingleStageJob(final SourceHolder sourceHolder, final StageConfig stageConfig, SinkHolder sinkHolder, PortSelector portSelector, RxMetrics rxMetrics, final Context context, Action0 action0, final int i, final Observable<Integer> observable, Action0 action02, Action0 action03, Action0 action04, Action1<Throwable> action1) {
        executeIntermediate(new WorkerConsumer() { // from class: io.mantisrx.runtime.executor.StageExecutors.1
            @Override // io.mantisrx.runtime.executor.WorkerConsumer
            public Observable start(StageConfig stageConfig2) {
                Index index = new Index(i, (Observable<Integer>) observable);
                sourceHolder.getSourceFunction().init(context, index);
                Observable observable2 = (Observable) sourceHolder.getSourceFunction().call(context, index);
                return stageConfig.getInputStrategy() == StageConfig.INPUT_STRATEGY.CONCURRENT ? observable2 : Observable.just(Observable.merge(observable2));
            }

            @Override // io.mantisrx.runtime.executor.WorkerConsumer
            public void stop() {
            }
        }, stageConfig, new SinkPublisher(sinkHolder, portSelector, context, action0, action02, action03, action04, action1), context);
    }

    public static void executeSource(final int i, final SourceHolder sourceHolder, StageConfig stageConfig, WorkerPublisher workerPublisher, final Context context, final Observable<Integer> observable) {
        executeIntermediate(new WorkerConsumer() { // from class: io.mantisrx.runtime.executor.StageExecutors.2
            @Override // io.mantisrx.runtime.executor.WorkerConsumer
            public Observable start(StageConfig stageConfig2) {
                sourceHolder.getSourceFunction().init(context, new Index(i, (Observable<Integer>) observable));
                return MantisMarker.sourceOut((Observable) sourceHolder.getSourceFunction().call(context, new Index(i, (Observable<Integer>) observable)));
            }

            @Override // io.mantisrx.runtime.executor.WorkerConsumer
            public void stop() {
            }
        }, stageConfig, workerPublisher, context);
    }

    private static <K, T, R> Observable<Observable<R>> executeGroupsInParallel(Observable<GroupedObservable<K, T>> observable, Computation computation, Context context, long j) {
        logger.info("initializing {}", computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 func2 = (Func2) computation;
        return observable.lift(new MonitorOperator("worker_stage_outer")).map(groupedObservable -> {
            return ((Observable) func2.call(context, GroupedObservableUtils.createGroupedObservable(groupedObservable.getKey(), groupedObservable.doOnUnsubscribe(() -> {
                if (groupsExpiredCounter != null) {
                    groupsExpiredCounter.increment();
                }
            }).timeout(j, TimeUnit.SECONDS, Observable.empty()).subscribeOn(Schedulers.computation()).lift(new MonitorOperator("worker_stage_inner_input"))))).lift(new MonitorOperator("worker_stage_inner_output"));
        });
    }

    private static <K, T, R> Observable<Observable<R>> executeMantisGroups(Observable<Observable<MantisGroup<K, T>>> observable, Computation computation, Context context, long j) {
        logger.info("initializing {}", computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 func2 = (Func2) computation;
        return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable2 -> {
            return ((Observable) func2.call(context, observable2.lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
        });
    }

    private static <K, T, R> Observable<Observable<R>> executeMantisGroupsInParallel(Observable<Observable<MantisGroup<K, T>>> observable, Computation computation, Context context, boolean z, long j, int i) {
        logger.info("initializing {}", computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 func2 = (Func2) computation;
        if (i == -1) {
            return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable2 -> {
                return ((Observable) func2.call(context, observable2.observeOn(Schedulers.computation()).lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
            });
        }
        MantisRxSingleThreadScheduler[] mantisRxSingleThreadSchedulerArr = new MantisRxSingleThreadScheduler[i];
        RxThreadFactory rxThreadFactory = new RxThreadFactory("MantisRxSingleThreadScheduler-");
        logger.info("creating {} Mantis threads", Integer.valueOf(i));
        for (int i2 = 0; i2 < i; i2++) {
            mantisRxSingleThreadSchedulerArr[i2] = new MantisRxSingleThreadScheduler(rxThreadFactory);
        }
        return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable3 -> {
            return observable3.groupBy(mantisGroup -> {
                return Integer.valueOf(Math.abs(mantisGroup.getKeyValue().hashCode()) % i);
            }).flatMap(groupedObservable -> {
                return ((Observable) func2.call(context, groupedObservable.observeOn(mantisRxSingleThreadSchedulerArr[((Integer) groupedObservable.getKey()).intValue()]).lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
            });
        });
    }

    private static <T, R> Observable<Observable<R>> executeInners(Observable<Observable<T>> observable, Computation computation, Context context, boolean z, long j) {
        logger.info("initializing {}", computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 func2 = (Func2) computation;
        return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable2 -> {
            return ((Observable) func2.call(context, observable2.lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
        });
    }

    private static <T, R> Observable<Observable<R>> executeInnersInParallel(Observable<Observable<T>> observable, Computation computation, Context context, boolean z, long j, int i) {
        logger.info("initializing {}", computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 func2 = (Func2) computation;
        if (i == -1) {
            return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable2 -> {
                return ((Observable) func2.call(context, observable2.observeOn(Schedulers.computation()).lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
            });
        }
        MantisRxSingleThreadScheduler[] mantisRxSingleThreadSchedulerArr = new MantisRxSingleThreadScheduler[i];
        RxThreadFactory rxThreadFactory = new RxThreadFactory("MantisRxSingleThreadScheduler-");
        logger.info("creating {} Mantis threads", Integer.valueOf(i));
        for (int i2 = 0; i2 < i; i2++) {
            mantisRxSingleThreadSchedulerArr[i2] = new MantisRxSingleThreadScheduler(rxThreadFactory);
        }
        return observable.lift(new MonitorOperator("worker_stage_outer")).map(observable3 -> {
            return observable3.groupBy(obj -> {
                return Long.valueOf(System.nanoTime() % i);
            }).flatMap(groupedObservable -> {
                return ((Observable) func2.call(context, groupedObservable.observeOn(mantisRxSingleThreadSchedulerArr[((Long) groupedObservable.getKey()).intValue()]).lift(new MonitorOperator("worker_stage_inner_input")))).lift(new MonitorOperator("worker_stage_inner_output"));
            });
        });
    }

    private static int resolveStageConcurrency(int i) {
        if (i == -1) {
            String str = "JOB_PARAM_" + ParameterUtils.STAGE_CONCURRENCY;
            String str2 = System.getenv(str);
            logger.info("Job param: " + str + " value: " + str2);
            if (str2 != null && !str2.isEmpty()) {
                logger.info("Job param: " + str + " value: " + str2);
                try {
                    int parseInt = Integer.parseInt(str2);
                    return parseInt <= 0 ? i : parseInt;
                } catch (NumberFormatException e) {
                }
            }
        }
        return i;
    }

    private static <T, R> Observable<Observable<R>> setupScalarToScalarStage(ScalarToScalar<T, R> scalarToScalar, Observable<Observable<T>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = scalarToScalar.getInputStrategy();
        logger.info("Setting up ScalarToScalar stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return executeInnersInParallel(observable, scalarToScalar.getComputation(), context, false, 2147483647L, resolveStageConcurrency(scalarToScalar.getConcurrency()));
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeInners(Observable.just(Observable.merge(observable)), scalarToScalar.getComputation(), context, false, 2147483647L);
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    private static <K, T, R> Observable<Observable<GroupedObservable<String, R>>> setupScalarToKeyStage(ScalarToKey<K, T, R> scalarToKey, Observable<Observable<T>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = scalarToKey.getInputStrategy();
        logger.info("Setting up ScalarToKey stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return executeInnersInParallel(observable, scalarToKey.getComputation(), context, true, scalarToKey.getKeyExpireTimeSeconds(), resolveStageConcurrency(scalarToKey.getConcurrency()));
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeInners(Observable.just(Observable.merge(observable)), scalarToKey.getComputation(), context, true, scalarToKey.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    private static <K, T, R> Observable<Observable<MantisGroup<String, R>>> setupScalarToGroupStage(ScalarToGroup<K, T, R> scalarToGroup, Observable<Observable<T>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = scalarToGroup.getInputStrategy();
        logger.info("Setting up ScalarToGroup stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return executeInnersInParallel(observable, scalarToGroup.getComputation(), context, true, scalarToGroup.getKeyExpireTimeSeconds(), resolveStageConcurrency(scalarToGroup.getConcurrency()));
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeInners(Observable.just(Observable.merge(observable)), scalarToGroup.getComputation(), context, true, scalarToGroup.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    private static <K1, T, K2, R> Observable<Observable<GroupedObservable<String, R>>> setupKeyToKeyStage(KeyToKey<K1, T, K2, R> keyToKey, Observable<Observable<GroupedObservable<String, T>>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = keyToKey.getInputStrategy();
        logger.info("Setting up KeyToKey stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            throw new RuntimeException("Concurrency is not a supported input strategy for KeyComputation");
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeGroupsInParallel(Groups.flatten(observable), keyToKey.getComputation(), context, keyToKey.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    private static <K1, T, K2, R> Observable<Observable<MantisGroup<String, R>>> setupGroupToGroupStage(GroupToGroup<K1, T, K2, R> groupToGroup, Observable<Observable<MantisGroup<String, T>>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = groupToGroup.getInputStrategy();
        logger.info("Setting up GroupToGroup stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            throw new RuntimeException("Concurrency is not a supported input strategy for KeyComputation");
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeMantisGroups(Observable.just(Observable.merge(observable)), groupToGroup.getComputation(), context, groupToGroup.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    private static <K, T, R> Observable<Observable<R>> setupKeyToScalarStage(KeyToScalar<K, T, R> keyToScalar, Observable<Observable<MantisGroup<String, T>>> observable, Context context) {
        logger.info("Setting up KeyToScalar stage with input type: " + keyToScalar.getInputStrategy());
        return executeGroupsInParallel(Groups.flattenMantisGroupsToGroupedObservables(observable), keyToScalar.getComputation(), context, keyToScalar.getKeyExpireTimeSeconds());
    }

    private static <K, T, R> Observable<Observable<R>> setupGroupToScalarStage(GroupToScalar<K, T, R> groupToScalar, Observable<Observable<MantisGroup<K, T>>> observable, Context context) {
        StageConfig.INPUT_STRATEGY inputStrategy = groupToScalar.getInputStrategy();
        logger.info("Setting up GroupToScalar stage with input type: " + inputStrategy);
        if (inputStrategy == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            logger.info("Execute Groups in PARALLEL!!!!");
            return executeMantisGroupsInParallel(observable, groupToScalar.getComputation(), context, true, groupToScalar.getKeyExpireTimeSeconds(), resolveStageConcurrency(groupToScalar.getConcurrency()));
        }
        if (inputStrategy == StageConfig.INPUT_STRATEGY.SERIAL) {
            return executeMantisGroups(Observable.just(Observable.merge(observable)), groupToScalar.getComputation(), context, groupToScalar.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputStrategy.name());
    }

    public static <T, R> void executeIntermediate(WorkerConsumer workerConsumer, StageConfig<T, R> stageConfig, WorkerPublisher workerPublisher, Context context) {
        if (workerConsumer == null) {
            throw new IllegalArgumentException("consumer cannot be null");
        }
        if (stageConfig == null) {
            throw new IllegalArgumentException("stage cannot be null");
        }
        if (workerPublisher == null) {
            throw new IllegalArgumentException("producer cannot be null");
        }
        Observable<Observable<R>> observable = null;
        if (stageConfig instanceof ScalarToScalar) {
            ScalarToScalar scalarToScalar = (ScalarToScalar) stageConfig;
            observable = setupScalarToScalarStage(scalarToScalar, workerConsumer.start(scalarToScalar), context);
        } else if (stageConfig instanceof ScalarToKey) {
            ScalarToKey scalarToKey = (ScalarToKey) stageConfig;
            observable = setupScalarToKeyStage(scalarToKey, workerConsumer.start(scalarToKey), context);
        } else if (stageConfig instanceof ScalarToGroup) {
            ScalarToGroup scalarToGroup = (ScalarToGroup) stageConfig;
            observable = setupScalarToGroupStage(scalarToGroup, workerConsumer.start(scalarToGroup), context);
        } else if (stageConfig instanceof KeyToKey) {
            KeyToKey keyToKey = (KeyToKey) stageConfig;
            observable = setupKeyToKeyStage(keyToKey, workerConsumer.start(keyToKey), context);
        } else if (stageConfig instanceof GroupToGroup) {
            GroupToGroup groupToGroup = (GroupToGroup) stageConfig;
            observable = setupGroupToGroupStage(groupToGroup, workerConsumer.start(groupToGroup), context);
        } else if (stageConfig instanceof KeyToScalar) {
            KeyToScalar keyToScalar = (KeyToScalar) stageConfig;
            observable = setupKeyToScalarStage(keyToScalar, workerConsumer.start(keyToScalar), context);
        } else if (stageConfig instanceof GroupToScalar) {
            GroupToScalar groupToScalar = (GroupToScalar) stageConfig;
            observable = setupGroupToScalarStage(groupToScalar, workerConsumer.start(groupToScalar), context);
        }
        workerPublisher.start(stageConfig, observable);
    }

    public static void executeSink(WorkerConsumer workerConsumer, StageConfig stageConfig, SinkHolder sinkHolder, PortSelector portSelector, RxMetrics rxMetrics, Context context, Action0 action0, Action0 action02, Action0 action03, Action0 action04, Action1<Throwable> action1) {
        executeIntermediate(workerConsumer, stageConfig, new SinkPublisher(sinkHolder, portSelector, context, action0, action02, action03, action04, action1), context);
    }

    static {
        stageBufferIntervalMs = 100L;
        maxItemsInBuffer = 100;
        stageBufferIntervalMs = Integer.parseInt(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.stage.buffer.intervalMs", "100"));
        maxItemsInBuffer = Integer.parseInt(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.stage.buffer.maxSize", "100"));
    }
}
