package io.flinkspector.datastream;

import io.flinkspector.core.input.Input;
import io.flinkspector.core.runtime.OutputVerifier;
import io.flinkspector.core.runtime.Runner;
import io.flinkspector.core.trigger.DefaultTestTrigger;
import io.flinkspector.core.trigger.VerifyFinishedTrigger;
import io.flinkspector.datastream.functions.ParallelFromStreamRecordsFunction;
import io.flinkspector.datastream.functions.TestSink;
import io.flinkspector.datastream.input.EventTimeInput;
import io.flinkspector.shade.com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestStreamEnvironment;

/* loaded from: input_file:io/flinkspector/datastream/DataStreamTestEnvironment.class */
public class DataStreamTestEnvironment extends TestStreamEnvironment {
    private final Runner runner;

    public DataStreamTestEnvironment(final MiniCluster miniCluster, final int i) {
        super(miniCluster, i);
        this.runner = new Runner(miniCluster) { // from class: io.flinkspector.datastream.DataStreamTestEnvironment.1
            protected void executeEnvironment() throws Throwable {
                TestStreamEnvironment.setAsContext(miniCluster, i);
                try {
                    DataStreamTestEnvironment.this.execute();
                } finally {
                    TestStreamEnvironment.unsetAsContext();
                }
            }
        };
    }

    public static DataStreamTestEnvironment createTestEnvironment(int i) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
        if (!configuration.contains(RestOptions.BIND_PORT)) {
            configuration.setString(RestOptions.BIND_PORT, "0");
        }
        return new DataStreamTestEnvironment(new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, availableProcessors)).build()), i);
    }

    public void executeTest() throws Throwable {
        this.runner.executeTest();
    }

    public <IN> TestSink<IN> createTestSink(OutputVerifier<IN> outputVerifier) {
        return new TestSink<>(this.runner.registerListener(outputVerifier, new DefaultTestTrigger()), this.runner.getRingBuffer());
    }

    public <IN> TestSink<IN> createTestSink(OutputVerifier<IN> outputVerifier, VerifyFinishedTrigger verifyFinishedTrigger) {
        return new TestSink<>(this.runner.registerListener(outputVerifier, verifyFinishedTrigger), this.runner.getRingBuffer());
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElementsWithTimeStamp(StreamRecord<OUT>... streamRecordArr) {
        return fromCollectionWithTimestamp(Arrays.asList(streamRecordArr), false);
    }

    public <OUT> DataStreamSource<OUT> fromInput(EventTimeInput<OUT> eventTimeInput) {
        return fromCollectionWithTimestamp(eventTimeInput.getInput(), eventTimeInput.getFlushWindowsSetting());
    }

    public <OUT> DataStreamSource<OUT> fromInput(Input<OUT> input) {
        return fromCollection(input.getInput());
    }

    public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> collection, Boolean bool) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        StreamRecord<OUT> next = collection.iterator().next();
        if (next == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            return fromCollectionWithTimestamp(collection, TypeExtractor.getForObject(next.getValue()), bool);
        } catch (Exception e) {
            throw new RuntimeException("Could not startWith TypeInformation for type " + next.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
        }
    }

    public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> collection, TypeInformation<OUT> typeInformation, Boolean bool) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        StreamRecord<OUT> next = collection.iterator().next();
        try {
            TypeInformation forObject = TypeExtractor.getForObject(next);
            FromElementsFunction.checkCollection(collection, forObject.getTypeClass());
            try {
                return addSource(new ParallelFromStreamRecordsFunction(forObject.createSerializer(getConfig()), collection, bool), "Collection Source", typeInformation);
            } catch (IOException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Exception e2) {
            throw new RuntimeException("Could not startWith TypeInformation for type " + next.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
        }
    }

    public <OUT> DataStreamSource<OUT> fromInput(Collection<OUT> collection) {
        return super.fromCollection(collection);
    }

    public Boolean hasBeenStopped() {
        return this.runner.hasBeenStopped();
    }

    public Long getTimeoutInterval() {
        return this.runner.getTimeoutInterval();
    }

    public void setTimeoutInterval(long j) {
        this.runner.setTimeoutInterval(j);
    }
}
