package spark.streaming.util;

import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import scala.Console$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Range$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.AnyValManifest;
import scala.reflect.ClassManifest;
import scala.reflect.ClassManifest$;
import scala.reflect.Manifest$;
import scala.reflect.OptManifest;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt;
import scala.runtime.RichLong;
import spark.Logging;
import spark.streaming.DStream;
import spark.streaming.Duration;
import spark.streaming.Milliseconds$;
import spark.streaming.Seconds$;
import spark.streaming.StreamingContext;

/* compiled from: MasterFailureTest.scala */
/* loaded from: input_file:spark/streaming/util/MasterFailureTest$.class */
public final class MasterFailureTest$ implements Logging {
    public static final MasterFailureTest$ MODULE$ = null;
    private volatile boolean killed;
    private volatile int killCount;
    private transient Logger spark$Logging$$log_;

    static {
        new MasterFailureTest$();
    }

    public final Logger spark$Logging$$log_() {
        return this.spark$Logging$$log_;
    }

    public final void spark$Logging$$log__$eq(Logger logger) {
        this.spark$Logging$$log_ = logger;
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public void initLogging() {
        Logging.class.initLogging(this);
    }

    public boolean killed() {
        return this.killed;
    }

    public void killed_$eq(boolean z) {
        this.killed = z;
    }

    public int killCount() {
        return this.killCount;
    }

    public void killCount_$eq(int i) {
        this.killCount = i;
    }

    public void main(String[] strArr) {
        if (Predef$.MODULE$.refArrayOps(strArr).size() < 2) {
            Predef$.MODULE$.println("Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]");
            System.exit(1);
        }
        String str = strArr[0];
        int i = Predef$.MODULE$.augmentString(strArr[1]).toInt();
        Duration apply = Predef$.MODULE$.refArrayOps(strArr).size() > 2 ? Milliseconds$.MODULE$.apply(Predef$.MODULE$.augmentString(strArr[2]).toInt()) : Seconds$.MODULE$.apply(1L);
        Predef$.MODULE$.println("\n\n========================= MAP TEST =========================\n\n");
        testMap(str, i, apply);
        Predef$.MODULE$.println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n");
        testUpdateStateByKey(str, i, apply);
        Predef$.MODULE$.println("\n\nSUCCESS\n\n");
    }

    public void testMap(String str, int i, Duration duration) {
        Seq seq = ((Seq) new RichInt(1).to(i).map(new MasterFailureTest$$anonfun$2(), IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
        Range.Inclusive inclusive = new RichInt(1).to(i);
        MasterFailureTest$$anonfun$3 masterFailureTest$$anonfun$3 = new MasterFailureTest$$anonfun$3();
        AnyValManifest Int = Manifest$.MODULE$.Int();
        Predef$ predef$ = Predef$.MODULE$;
        Set set = ((TraversableOnce) inclusive.distinct()).toSet();
        Set set2 = inclusive.toSet();
        predef$.assert(set != null ? set.equals(set2) : set2 == null);
        Tuple3<StreamingContext, Path, Path> tuple3 = setupStreams(str, duration, masterFailureTest$$anonfun$3, Int);
        if (tuple3 == null) {
            throw new MatchError(tuple3);
        }
        Tuple3 tuple32 = new Tuple3(tuple3._1(), tuple3._2(), tuple3._3());
        StreamingContext streamingContext = (StreamingContext) tuple32._1();
        Path path = (Path) tuple32._2();
        Path path2 = (Path) tuple32._3();
        FileGeneratingThread fileGeneratingThread = new FileGeneratingThread(seq, path2, duration.milliseconds());
        fileGeneratingThread.start();
        scala.collection.Seq runStreams = runStreams(streamingContext, inclusive.last(), inclusive.size() * duration.milliseconds() * 2, Int);
        fileGeneratingThread.join();
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        fileSystem.delete(path, true);
        fileSystem.delete(path2, true);
        logInfo(new MasterFailureTest$$anonfun$testOperation$1());
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testMap$1(inclusive));
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testMap$2(inclusive));
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testMap$3(runStreams));
        logInfo(new MasterFailureTest$$anonfun$testMap$4(runStreams));
        Predef$ predef$2 = Predef$.MODULE$;
        Set set3 = ((TraversableOnce) runStreams.distinct()).toSet();
        Set set4 = inclusive.toSet();
        predef$2.assert(set3 != null ? set3.equals(set4) : set4 == null);
    }

    public void testUpdateStateByKey(String str, int i, Duration duration) {
        Seq seq = ((Seq) new RichInt(1).to(i).map(new MasterFailureTest$$anonfun$4(), IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
        IndexedSeq indexedSeq = (IndexedSeq) ((TraversableLike) new RichLong(1L).to(BoxesRunTime.boxToLong(i)).map(new MasterFailureTest$$anonfun$1(), IndexedSeq$.MODULE$.canBuildFrom())).map(new MasterFailureTest$$anonfun$5(), IndexedSeq$.MODULE$.canBuildFrom());
        MasterFailureTest$$anonfun$6 masterFailureTest$$anonfun$6 = new MasterFailureTest$$anonfun$6(duration);
        ClassManifest classType = ClassManifest$.MODULE$.classType(Tuple2.class, ClassManifest$.MODULE$.classType(String.class), Predef$.MODULE$.wrapRefArray(new OptManifest[]{Manifest$.MODULE$.Long()}));
        Predef$ predef$ = Predef$.MODULE$;
        Set set = ((TraversableOnce) indexedSeq.distinct()).toSet();
        Set set2 = indexedSeq.toSet();
        predef$.assert(set != null ? set.equals(set2) : set2 == null);
        Tuple3<StreamingContext, Path, Path> tuple3 = setupStreams(str, duration, masterFailureTest$$anonfun$6, classType);
        if (tuple3 == null) {
            throw new MatchError(tuple3);
        }
        Tuple3 tuple32 = new Tuple3(tuple3._1(), tuple3._2(), tuple3._3());
        StreamingContext streamingContext = (StreamingContext) tuple32._1();
        Path path = (Path) tuple32._2();
        Path path2 = (Path) tuple32._3();
        FileGeneratingThread fileGeneratingThread = new FileGeneratingThread(seq, path2, duration.milliseconds());
        fileGeneratingThread.start();
        scala.collection.Seq runStreams = runStreams(streamingContext, indexedSeq.last(), indexedSeq.size() * duration.milliseconds() * 2, classType);
        fileGeneratingThread.join();
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        fileSystem.delete(path, true);
        fileSystem.delete(path2, true);
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testOperation$1());
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testUpdateStateByKey$1(indexedSeq));
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testUpdateStateByKey$2(runStreams));
        runStreams.foreach(new MasterFailureTest$$anonfun$testUpdateStateByKey$3(indexedSeq));
        Predef$ predef$2 = Predef$.MODULE$;
        Object last = runStreams.last();
        Object last2 = indexedSeq.last();
        predef$2.assert(last != last2 ? last != null ? !(last instanceof Number) ? !(last instanceof Character) ? last.equals(last2) : BoxesRunTime.equalsCharObject((Character) last, last2) : BoxesRunTime.equalsNumObject((Number) last, last2) : false : true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> scala.collection.Seq<T> testOperation(String str, Duration duration, scala.collection.Seq<String> seq, Function1<DStream<String>, DStream<T>> function1, scala.collection.Seq<T> seq2, ClassManifest<T> classManifest) {
        Predef$ predef$ = Predef$.MODULE$;
        Set set = ((TraversableOnce) seq2.distinct()).toSet();
        Set set2 = seq2.toSet();
        predef$.assert(set != null ? set.equals(set2) : set2 == null);
        Tuple3<StreamingContext, Path, Path> tuple3 = setupStreams(str, duration, function1, classManifest);
        if (tuple3 == null) {
            throw new MatchError(tuple3);
        }
        Tuple3 tuple32 = new Tuple3(tuple3._1(), tuple3._2(), tuple3._3());
        StreamingContext streamingContext = (StreamingContext) tuple32._1();
        Path path = (Path) tuple32._2();
        Path path2 = (Path) tuple32._3();
        FileGeneratingThread fileGeneratingThread = new FileGeneratingThread(seq, path2, duration.milliseconds());
        fileGeneratingThread.start();
        scala.collection.Seq<T> runStreams = runStreams(streamingContext, seq2.last(), seq2.size() * duration.milliseconds() * 2, classManifest);
        fileGeneratingThread.join();
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        fileSystem.delete(path, true);
        fileSystem.delete(path2, true);
        Logging.class.logInfo(this, new MasterFailureTest$$anonfun$testOperation$1());
        return runStreams;
    }

    private <T> Tuple3<StreamingContext, Path, Path> setupStreams(String str, Duration duration, Function1<DStream<String>, DStream<T>> function1, ClassManifest<T> classManifest) {
        reset();
        Path path = new Path(str, UUID.randomUUID().toString());
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        Path path2 = new Path(path, "checkpoint");
        Path path3 = new Path(path, "test");
        fileSystem.mkdirs(path2);
        fileSystem.mkdirs(path3);
        System.clearProperty("spark.driver.port");
        StreamingContext streamingContext = new StreamingContext("local[4]", "MasterFailureTest", duration, null, Nil$.MODULE$, Predef$.MODULE$.Map().apply(Nil$.MODULE$));
        streamingContext.checkpoint(path2.toString());
        streamingContext.registerOutputStream(new TestOutputStream((DStream) function1.apply(streamingContext.textFileStream(path3.toString())), TestOutputStream$.MODULE$.init$default$2(), classManifest));
        return new Tuple3<>(streamingContext, path2, path3);
    }

    /* JADX WARN: Removed duplicated region for block: B:30:0x028c  */
    /* JADX WARN: Removed duplicated region for block: B:33:0x0290  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private <T> scala.collection.Seq<T> runStreams(spark.streaming.StreamingContext r9, T r10, long r11, scala.reflect.ClassManifest<T> r13) {
        /*
            Method dump skipped, instructions count: 663
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: spark.streaming.util.MasterFailureTest$.runStreams(spark.streaming.StreamingContext, java.lang.Object, long, scala.reflect.ClassManifest):scala.collection.Seq");
    }

    private <T> void verifyOutput(scala.collection.Seq<T> seq, scala.collection.Seq<T> seq2, ClassManifest<T> classManifest) {
        Range apply = Range$.MODULE$.apply(new RichInt(0).self(), seq2.size() - 1);
        if (apply.length() > 0) {
            int last = apply.last();
            int start = apply.start();
            while (true) {
                int i = start;
                if (i == last) {
                    Object apply2 = seq2.apply(i);
                    Object apply3 = seq2.apply(i + 1);
                    if (!(!(apply2 != apply3 ? apply2 != null ? !(apply2 instanceof Number) ? !(apply2 instanceof Character) ? apply2.equals(apply3) : BoxesRunTime.equalsCharObject((Character) apply2, apply3) : BoxesRunTime.equalsNumObject((Number) apply2, apply3) : false : true))) {
                        throw new AssertionError(new StringBuilder().append("assertion failed: ").append("Expected output has consecutive duplicate sequence of values").toString());
                    }
                } else {
                    Object apply4 = seq2.apply(i);
                    Object apply5 = seq2.apply(i + 1);
                    if (!(!(apply4 != apply5 ? apply4 != null ? !(apply4 instanceof Number) ? !(apply4 instanceof Character) ? apply4.equals(apply5) : BoxesRunTime.equalsCharObject((Character) apply4, apply5) : BoxesRunTime.equalsNumObject((Number) apply4, apply5) : false : true))) {
                        throw new AssertionError(new StringBuilder().append("assertion failed: ").append("Expected output has consecutive duplicate sequence of values").toString());
                    }
                    start = i + apply.step();
                }
            }
        }
        Console$.MODULE$.println(new StringBuilder().append("Expected output, size = ").append(BoxesRunTime.boxToInteger(seq2.size())).toString());
        Console$.MODULE$.println(seq2.mkString("[", ",", "]"));
        Console$.MODULE$.println(new StringBuilder().append("Output, size = ").append(BoxesRunTime.boxToInteger(seq.size())).toString());
        Predef$.MODULE$.println(seq.mkString("[", ",", "]"));
        seq.foreach(new MasterFailureTest$$anonfun$verifyOutput$2(seq2));
    }

    private void reset() {
        killed_$eq(false);
        killCount_$eq(0);
    }

    public final ArrayBuffer output$4(ArrayBuffer arrayBuffer) {
        return (ArrayBuffer) arrayBuffer.flatMap(new MasterFailureTest$$anonfun$output$4$1(), ArrayBuffer$.MODULE$.canBuildFrom());
    }

    private MasterFailureTest$() {
        MODULE$ = this;
        Logging.class.$init$(this);
        initLogging();
        this.killed = false;
        this.killCount = 0;
    }
}
