package io.mantisrx.runtime.core;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Config;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.computation.ToGroupComputation;
import io.mantisrx.runtime.core.functions.FilterFunction;
import io.mantisrx.runtime.core.functions.FlatMapFunction;
import io.mantisrx.runtime.core.functions.FunctionCombinator;
import io.mantisrx.runtime.core.functions.KeyByFunction;
import io.mantisrx.runtime.core.functions.MantisFunction;
import io.mantisrx.runtime.core.functions.MapFunction;
import io.mantisrx.runtime.core.functions.WindowFunction;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sinks.SinkFunction;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import io.mantisrx.shaded.com.google.common.graph.ImmutableValueGraph;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/runtime/core/MantisStreamImpl.class */
public class MantisStreamImpl<T> implements MantisStream<T> {
    private static final Logger log = LoggerFactory.getLogger(MantisStreamImpl.class);
    final OperandNode<T> currNode;
    final MantisGraph graph;
    final Iterable<ParameterDefinition<?>> params;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MantisStreamImpl(OperandNode<T> operandNode, MantisGraph mantisGraph) {
        this(operandNode, mantisGraph, ImmutableList.of());
    }

    MantisStreamImpl(OperandNode<T> operandNode, MantisGraph mantisGraph, Iterable<ParameterDefinition<?>> iterable) {
        this.currNode = operandNode;
        this.graph = mantisGraph;
        this.params = iterable;
    }

    public static <T> MantisStream<T> init() {
        OperandNode<?> operandNode = new OperandNode<>(0, "init");
        return new MantisStreamImpl(operandNode, new MantisGraph().addNode(operandNode));
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public <OUT> MantisStream<OUT> source(SourceFunction<OUT> sourceFunction) {
        return updateGraph(sourceFunction);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public Config<T> sink(SinkFunction<T> sinkFunction) {
        ImmutableValueGraph<OperandNode<?>, MantisFunction> immutable = updateGraph(sinkFunction).graph.immutable();
        return (Config<T>) makeMantisJob(immutable, topSortTraversal(immutable)).buildJobConfig();
    }

    <OUT> MantisStreamImpl<OUT> updateGraph(MantisFunction mantisFunction) {
        OperandNode<?> create = OperandNode.create(this.graph, mantisFunction.getClass().getName() + "OUT");
        this.graph.putEdge(this.currNode, create, mantisFunction);
        return new MantisStreamImpl<>(create, this.graph);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public MantisStream<T> filter(FilterFunction<T> filterFunction) {
        return updateGraph(filterFunction);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public <OUT> MantisStream<OUT> map(MapFunction<T, OUT> mapFunction) {
        return updateGraph(mapFunction);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public <OUT> MantisStream<OUT> flatMap(FlatMapFunction<T, OUT> flatMapFunction) {
        return updateGraph(flatMapFunction);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public MantisStream<T> materialize() {
        return materializeInternal();
    }

    private MantisStreamImpl<T> materializeInternal() {
        this.graph.putEdge(this.currNode, this.currNode, MantisFunction.empty());
        return new MantisStreamImpl<>(this.currNode, this.graph);
    }

    private <K> KeyedMantisStreamImpl<K, T> keyByInternal(KeyByFunction<K, T> keyByFunction) {
        OperandNode<?> create = OperandNode.create(this.graph, "keyByOut");
        this.graph.putEdge(this.currNode, create, keyByFunction);
        return new KeyedMantisStreamImpl<>(create, this.graph);
    }

    @Override // io.mantisrx.runtime.core.MantisStream
    public <K> KeyedMantisStream<K, T> keyBy(KeyByFunction<K, T> keyByFunction) {
        return materializeInternal().keyByInternal(keyByFunction);
    }

    private MantisJobBuilder makeMantisJob(ImmutableValueGraph<OperandNode<?>, MantisFunction> immutableValueGraph, Iterable<OperandNode<?>> iterable) {
        MantisJobBuilder mantisJobBuilder = new MantisJobBuilder();
        AtomicReference atomicReference = new AtomicReference(new FunctionCombinator(false));
        for (OperandNode<?> operandNode : iterable) {
            Set<OperandNode<?>> successors = immutableValueGraph.successors(operandNode);
            if (successors.size() != 0) {
                Optional edgeValue = immutableValueGraph.edgeValue(operandNode, operandNode);
                Integer num = (Integer) edgeValue.map(mantisFunction -> {
                    return 1;
                }).orElse(0);
                edgeValue.ifPresent(mantisFunction2 -> {
                    if (MantisFunction.empty().equals(mantisFunction2)) {
                        mantisJobBuilder.addStage(((FunctionCombinator) atomicReference.get()).makeStage(), operandNode.getCodec(), operandNode.getKeyCodec());
                        atomicReference.set(new FunctionCombinator(false));
                    } else if (mantisFunction2 instanceof WindowFunction) {
                        atomicReference.set(((FunctionCombinator) atomicReference.get()).add(mantisFunction2));
                    }
                });
                if (successors.size() - num.intValue() > 1) {
                    log.warn("Found multi-output node {} with nbrs {}. Not supported yet!", operandNode, successors);
                }
                for (OperandNode<?> operandNode2 : successors) {
                    if (operandNode2 != operandNode) {
                        immutableValueGraph.edgeValue(operandNode, operandNode2).ifPresent(mantisFunction3 -> {
                            if (mantisFunction3 instanceof SourceFunction) {
                                if (mantisFunction3 instanceof ObservableSourceImpl) {
                                    mantisJobBuilder.addStage(((ObservableSourceImpl) mantisFunction3).getSource());
                                }
                            } else {
                                if (mantisFunction3 instanceof KeyByFunction) {
                                    if (((FunctionCombinator) atomicReference.get()).size() > 0) {
                                        log.warn("Unempty composite found for KeyByFunction {}", atomicReference.get());
                                    }
                                    mantisJobBuilder.addStage(makeGroupComputation((KeyByFunction) mantisFunction3), operandNode.getCodec(), operandNode.getKeyCodec());
                                    atomicReference.set(new FunctionCombinator(true));
                                    return;
                                }
                                if (!(mantisFunction3 instanceof SinkFunction)) {
                                    atomicReference.set(((FunctionCombinator) atomicReference.get()).add(mantisFunction3));
                                    return;
                                }
                                mantisJobBuilder.addStage(((FunctionCombinator) atomicReference.get()).makeStage(), operandNode.getCodec());
                                if (mantisFunction3 instanceof ObservableSinkImpl) {
                                    mantisJobBuilder.addStage(((ObservableSinkImpl) mantisFunction3).getSink());
                                }
                            }
                        });
                    }
                }
            }
        }
        return mantisJobBuilder;
    }

    private <A, K> Computation makeGroupComputation(final KeyByFunction<K, A> keyByFunction) {
        return new ToGroupComputation<A, K, A>() { // from class: io.mantisrx.runtime.core.MantisStreamImpl.1
            @Override // io.mantisrx.runtime.computation.Computation
            public void init(Context context) {
                keyByFunction.init();
            }

            public Observable<MantisGroup<K, A>> call(Context context, Observable<A> observable) {
                KeyByFunction keyByFunction2 = keyByFunction;
                return observable.map(obj -> {
                    return new MantisGroup(keyByFunction2.getKey(obj), obj);
                });
            }
        };
    }

    static <V, E> Iterable<V> topSortTraversal(ImmutableValueGraph<V, E> immutableValueGraph) {
        Map map = (Map) immutableValueGraph.nodes().stream().collect(Collectors.toMap(obj -> {
            return obj;
        }, obj2 -> {
            return new AtomicInteger(immutableValueGraph.inDegree(obj2) - (immutableValueGraph.hasEdgeConnecting(obj2, obj2) ? 1 : 0));
        }));
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        Object collect = map.keySet().stream().filter(obj3 -> {
            return ((AtomicInteger) map.get(obj3)).get() == 0;
        }).collect(Collectors.toList());
        while (true) {
            List list = (List) collect;
            if (list.isEmpty()) {
                return arrayList;
            }
            list.forEach(obj4 -> {
                immutableValueGraph.successors(obj4).forEach(obj4 -> {
                    if (obj4 != obj4) {
                        ((AtomicInteger) map.get(obj4)).decrementAndGet();
                    }
                });
            });
            hashSet.addAll(list);
            arrayList.addAll(list);
            collect = list.stream().flatMap(obj5 -> {
                return immutableValueGraph.successors(obj5).stream();
            }).filter(obj6 -> {
                return !hashSet.contains(obj6) && ((AtomicInteger) map.get(obj6)).get() == 0;
            }).collect(Collectors.toList());
        }
    }
}
