package org.voltdb.stream.execution;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.VoltEnvironment;
import org.voltdb.stream.api.extension.Operator;
import org.voltdb.stream.api.extension.VoltStream;
import org.voltdb.stream.api.extension.VoltStreamFunctionConfigurator;
import org.voltdb.stream.api.extension.VoltStreamSinkConfigurator;
import org.voltdb.stream.api.extension.VoltStreamSourceConfigurator;
import org.voltdb.stream.api.pipeline.ExceptionHandler;
import org.voltdb.stream.api.pipeline.ExceptionHandlerBuilder;
import org.voltdb.stream.api.pipeline.VoltOpenStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.api.pipeline.VoltStreamSource;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/voltdb/stream/execution/CollectingStream.class */
public class CollectingStream implements VoltStreamBuilder, VoltStream, ExceptionHandlerBuilder {
    protected static final Logger LOG = LoggerFactory.getLogger("STREAM");
    private static final ExceptionHandler DEFAULT_EXCEPTION_HANDLER = (list, executionContext, th) -> {
        LOG.warn("Failed while processing {} records, cause is: {}. Enable DEBUG log level or override default pipeline's exception handler to process failed records.", Integer.valueOf(list.size()), th.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Detailed stacktrace", th);
            if (list.isEmpty()) {
                return;
            }
            LOG.debug("Failed records {}", list);
        }
    };
    private final VoltEnvironmentAdapter environment;
    private VoltStreamSourceConfigurator<?> source;
    private VoltStreamSinkConfigurator<?> sink;
    private final Map<String, VoltStreamSinkConfigurator<?>> additionalSinkConfigurators = new HashMap();
    private String name = "Volt Stream";
    private final List<VoltStreamFunctionConfigurator<?, ?>> steps = new ArrayList();
    private ExceptionHandler exceptionHandler = DEFAULT_EXCEPTION_HANDLER;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CollectingStream(VoltEnvironmentAdapter voltEnvironmentAdapter) {
        this.environment = voltEnvironmentAdapter;
    }

    public VoltStreamBuilder withName(String str) {
        this.name = str;
        return this;
    }

    public <T> VoltOpenStreamBuilder<T> consumeFromSource(final VoltStreamSource<T> voltStreamSource) {
        return consumeFromSource(new VoltStreamSourceConfigurator<T>() { // from class: org.voltdb.stream.execution.CollectingStream.1
            public List<Operator> configure(VoltEnvironment voltEnvironment) {
                return List.of(voltStreamSource);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> VoltOpenStreamBuilder<T> consumeFromSource(VoltStreamSourceConfigurator<T> voltStreamSourceConfigurator) {
        this.steps.clear();
        this.source = voltStreamSourceConfigurator;
        return new OpenedDataStream(this, this.steps, this.environment);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <O> void terminateWithSink(final VoltStreamSink<O> voltStreamSink) {
        terminateWithSink(new VoltStreamSinkConfigurator<O>() { // from class: org.voltdb.stream.execution.CollectingStream.2
            public List<Operator> configure(VoltEnvironment voltEnvironment) {
                return List.of(voltStreamSink);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public <O> void terminateWithSink(VoltStreamSinkConfigurator<O> voltStreamSinkConfigurator) {
        this.sink = voltStreamSinkConfigurator;
    }

    public ExceptionHandlerBuilder onError() {
        return this;
    }

    public ExecutionContext getExecutionContext() {
        return this.environment.getExecutionContext();
    }

    public void validate() {
        try {
            Objects.requireNonNull(this.source, "Source is required");
            Objects.requireNonNull(this.sink, "Sink is required");
        } catch (Exception e) {
            throw new VoltEnvironment.VoltStreamException("Could not instantiate a stream: '" + this.name + "'", e);
        }
    }

    public VoltStreamSourceConfigurator<?> getSource() {
        return this.source;
    }

    public VoltStreamSinkConfigurator<?> getSink() {
        return this.sink;
    }

    public List<VoltStreamFunctionConfigurator<?, ?>> getSteps() {
        return this.steps;
    }

    public Map<String, VoltStreamSinkConfigurator<?>> getAdditionalSinks() {
        return Collections.unmodifiableMap(this.additionalSinkConfigurators);
    }

    public String getName() {
        return this.name;
    }

    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandler;
    }

    public ExceptionHandlerBuilder setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (this.exceptionHandler == DEFAULT_EXCEPTION_HANDLER) {
            this.exceptionHandler = exceptionHandler;
        } else {
            LOG.warn("ExceptionHandler is already defined, there can be only one handler defined");
        }
        return this;
    }

    public <T> ExceptionHandlerBuilder addNamedSink(String str, final VoltStreamSink<T> voltStreamSink) {
        return addNamedSink(str, new VoltStreamSinkConfigurator<T>() { // from class: org.voltdb.stream.execution.CollectingStream.3
            public List<Operator> configure(VoltEnvironment voltEnvironment) {
                return List.of(voltStreamSink);
            }
        });
    }

    public <T> ExceptionHandlerBuilder addNamedSink(String str, VoltStreamSinkConfigurator<T> voltStreamSinkConfigurator) {
        this.additionalSinkConfigurators.put(str, voltStreamSinkConfigurator);
        return this;
    }
}
