package io.mantisrx.runtime.core.functions;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.computation.GroupToScalarComputation;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/runtime/core/functions/FunctionCombinator.class */
public class FunctionCombinator<T, R> {
    private final boolean isKeyed;
    private final List<MantisFunction> functions;

    public FunctionCombinator(boolean z) {
        this(z, ImmutableList.of());
    }

    public FunctionCombinator(boolean z, List<MantisFunction> list) {
        this.isKeyed = z;
        this.functions = list;
    }

    public <IN, OUT> FunctionCombinator<T, OUT> add(MantisFunction mantisFunction) {
        return new FunctionCombinator<>(this.isKeyed, ImmutableList.builder().addAll(this.functions).add(mantisFunction).build());
    }

    public int size() {
        return this.functions.size();
    }

    @VisibleForTesting
    <U, V> ScalarComputation<U, V> makeScalarStage() {
        return new ScalarComputation<U, V>() { // from class: io.mantisrx.runtime.core.functions.FunctionCombinator.1
            @Override // io.mantisrx.runtime.computation.Computation
            public void init(Context context) {
                FunctionCombinator.this.functions.forEach((v0) -> {
                    v0.init();
                });
            }

            public Observable<V> call(Context context, Observable<U> observable) {
                Observable<U> observable2 = observable;
                for (MantisFunction mantisFunction : FunctionCombinator.this.functions) {
                    if (mantisFunction instanceof FilterFunction) {
                        FilterFunction filterFunction = (FilterFunction) mantisFunction;
                        filterFunction.getClass();
                        observable2 = observable2.filter(filterFunction::apply);
                    } else if (mantisFunction instanceof MapFunction) {
                        observable2 = observable2.map(obj -> {
                            return ((MapFunction) mantisFunction).apply(obj);
                        });
                    } else if (mantisFunction instanceof FlatMapFunction) {
                        observable2 = observable2.flatMap(obj2 -> {
                            return Observable.from(((FlatMapFunction) mantisFunction).apply(obj2));
                        });
                    }
                }
                return (Observable<V>) observable2;
            }
        };
    }

    @VisibleForTesting
    <K, U, V> GroupToScalarComputation<K, U, V> makeGroupToScalarStage() {
        return new GroupToScalarComputation<K, U, V>() { // from class: io.mantisrx.runtime.core.functions.FunctionCombinator.2
            @Override // io.mantisrx.runtime.computation.Computation
            public void init(Context context) {
                FunctionCombinator.this.functions.forEach((v0) -> {
                    v0.init();
                });
            }

            public Observable<V> call(Context context, Observable<MantisGroup<K, U>> observable) {
                return observable.groupBy((v0) -> {
                    return v0.getKeyValue();
                }).flatMap(groupedObservable -> {
                    Observable map = groupedObservable.map((v0) -> {
                        return v0.getValue();
                    });
                    groupedObservable.getKey();
                    for (MantisFunction mantisFunction : FunctionCombinator.this.functions) {
                        if (mantisFunction instanceof FilterFunction) {
                            FilterFunction filterFunction = (FilterFunction) mantisFunction;
                            filterFunction.getClass();
                            map = map.filter(filterFunction::apply);
                        } else if (mantisFunction instanceof MapFunction) {
                            map = map.map(obj -> {
                                return ((MapFunction) mantisFunction).apply(obj);
                            });
                        } else if (mantisFunction instanceof FlatMapFunction) {
                            map = map.flatMap(obj2 -> {
                                return Observable.from(((FlatMapFunction) mantisFunction).apply(obj2));
                            });
                        } else if (mantisFunction instanceof WindowFunction) {
                            map = FunctionCombinator.this.handleWindows(map, (WindowFunction) mantisFunction);
                        } else if (mantisFunction instanceof ReduceFunction) {
                            ReduceFunction reduceFunction = (ReduceFunction) mantisFunction;
                            map = map.map(observable2 -> {
                                return observable2.reduce(reduceFunction.initialValue(), (obj3, obj4) -> {
                                    return reduceFunction.reduce(obj3, obj4);
                                });
                            }).flatMap(observable3 -> {
                                return observable3;
                            }).filter(obj3 -> {
                                return Boolean.valueOf(obj3 != ReduceFunctionImpl.EMPTY);
                            });
                        }
                    }
                    return map;
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<? extends Observable<?>> handleWindows(Observable<?> observable, WindowFunction<?> windowFunction) {
        WindowSpec spec = windowFunction.getSpec();
        switch (spec.getType()) {
            case ELEMENT:
            case ELEMENT_SLIDING:
                return observable.window(spec.getNumElements(), spec.getElementOffset());
            case TUMBLING:
            case SLIDING:
                return observable.window(spec.getWindowLength().toMillis(), spec.getWindowOffset().toMillis(), TimeUnit.MILLISECONDS);
            default:
                throw new UnsupportedOperationException("Unknown WindowSpec must be one of " + Arrays.toString(WindowSpec.WindowType.values()));
        }
    }

    public Computation makeStage() {
        if (size() == 0) {
            return null;
        }
        return this.isKeyed ? makeGroupToScalarStage() : makeScalarStage();
    }

    public boolean isKeyed() {
        return this.isKeyed;
    }
}
