package org.voltdb.stream.execution;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.extension.Operator;
import org.voltdb.stream.api.pipeline.ExceptionHandler;
import org.voltdb.stream.api.pipeline.VoltStreamFunction;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.api.pipeline.VoltStreamSource;

/* loaded from: input_file:org/voltdb/stream/execution/InTestSingleRunStream.class */
public final class InTestSingleRunStream implements ExecutableStream {
    private final TestVoltEnvironment environment;
    private final List<Operator> operators;
    private final ExceptionHandler exceptionHandler;
    private final CompoundSink compoundSink;
    private ExecutionContext context;

    public InTestSingleRunStream(TestVoltEnvironment testVoltEnvironment, List<Operator> list, ExceptionHandler exceptionHandler, CompoundSink compoundSink) {
        this.environment = testVoltEnvironment;
        this.exceptionHandler = exceptionHandler;
        this.operators = (List) Objects.requireNonNull(list);
        if (list.size() < 2) {
            throw new IllegalArgumentException("Cannot form stream from less than 2 operators");
        }
        this.compoundSink = compoundSink;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.environment.getExecutionContextStubbing().recordTraceTo(this.environment).onErrorEmitTo(this.compoundSink);
        this.context = this.environment.getExecutionContext();
        long nextBatchId = this.context.nextBatchId();
        Runnable planSteps = planSteps(nextBatchId);
        join();
        runSteps(planSteps);
        if (commit(nextBatchId)) {
            leave();
        }
    }

    private void join() {
        this.operators.forEach(operator -> {
            operator.configure(this.context);
        });
        this.compoundSink.configure(this.context);
    }

    private void runSteps(Runnable runnable) {
        runnable.run();
    }

    private boolean commit(long j) {
        ArrayList<Operator> arrayList = new ArrayList(this.operators);
        Collections.reverse(arrayList);
        VoltStreamSink voltStreamSink = null;
        VoltStreamSource voltStreamSource = null;
        for (Operator operator : arrayList) {
            if (operator.getType() == Operator.Type.SINK) {
                voltStreamSink = (VoltStreamSink) operator;
            } else if (operator.getType() == Operator.Type.SOURCE) {
                voltStreamSource = (VoltStreamSource) operator;
            }
        }
        Objects.requireNonNull(voltStreamSource);
        Objects.requireNonNull(voltStreamSink);
        BatchAsyncCommitter batchAsyncCommitter = new BatchAsyncCommitter(this.context, this.environment, this.exceptionHandler, voltStreamSource, voltStreamSink, this.compoundSink);
        batchAsyncCommitter.commit(j, this.environment.getEmittedValuesFor(TestVoltEnvironment.SINK));
        return batchAsyncCommitter.hasCommitted();
    }

    public void leave() {
        this.operators.forEach(operator -> {
            operator.destroy(this.context);
        });
        this.compoundSink.destroy(this.context);
    }

    private Runnable planSteps(long j) {
        Iterator descendingIterator = new LinkedList(this.operators).descendingIterator();
        Operator operator = (Operator) descendingIterator.next();
        if (operator.getType() != Operator.Type.SINK) {
            throw new IllegalStateException("Cannot form stream when last operator is not a sink");
        }
        VoltStreamSource voltStreamSource = null;
        InTestBindingConsumer sinkConsumer = getSinkConsumer((VoltStreamSink) operator);
        int size = this.operators.size() - 1;
        while (true) {
            if (!descendingIterator.hasNext()) {
                break;
            }
            size--;
            VoltStreamSource voltStreamSource2 = (Operator) descendingIterator.next();
            if (voltStreamSource2.getType() == Operator.Type.FUNCTION) {
                sinkConsumer = getFunctionConsumer((VoltStreamFunction) voltStreamSource2, sinkConsumer, size == 1 ? TestVoltEnvironment.SOURCE : String.valueOf(voltStreamSource2.getType()) + size);
            } else if (voltStreamSource2.getType() == Operator.Type.SOURCE) {
                if (size != 0) {
                    throw new IllegalStateException("A stream cannot have multiple sources and sinks when run from test environment");
                }
                voltStreamSource = voltStreamSource2;
            }
        }
        InTestBindingConsumer inTestBindingConsumer = sinkConsumer;
        VoltStreamSource voltStreamSource3 = (VoltStreamSource) Objects.requireNonNull(voltStreamSource, "a source has been not found");
        return () -> {
            inTestBindingConsumer.nextBatchStarts(j, this.context);
            voltStreamSource3.process(j, inTestBindingConsumer, this.context);
            inTestBindingConsumer.batchProcessed(j);
        };
    }

    private InTestBindingConsumer getSinkConsumer(VoltStreamSink<Object> voltStreamSink) {
        return new InTestSinkConsumer(voltStreamSink, this.exceptionHandler, this.environment);
    }

    private InTestBindingConsumer getFunctionConsumer(VoltStreamFunction<Object, Object> voltStreamFunction, InTestBindingConsumer inTestBindingConsumer, String str) {
        return new InTestFunctionConsumer(voltStreamFunction, inTestBindingConsumer, str, this.exceptionHandler, this.environment);
    }
}
