package stream.scotty.demo.spark;

import java.lang.invoke.SerializedLambda;
import java.sql.Timestamp;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.Trigger;
import stream.scotty.core.windowType.SlidingWindow;
import stream.scotty.core.windowType.TumblingWindow;
import stream.scotty.core.windowType.WindowMeasure;
import stream.scotty.demo.spark.windowFunctions.SumWindowFunction;
import stream.scotty.sparkconnector.KeyedScottyWindowOperator;
import stream.scotty.sparkconnector.demo.DemoEvent;

/* loaded from: input_file:stream/scotty/demo/spark/SparkSumDemo.class */
public class SparkSumDemo {
    public static void main(String[] strArr) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        Logger.getLogger("akka").setLevel(Level.ERROR);
        Dataset as = SparkSession.builder().appName("SparkStructuredSumDemo").config("spark.master", "local").getOrCreate().readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:2181,localhost:9092").option("subscribe", "testInput").option("startingOffsets", "latest").load().selectExpr(new String[]{"CAST(key AS STRING)", "CAST(value as STRING)", "timestamp"}).as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.TIMESTAMP()));
        Dataset map = as.selectExpr(new String[]{"CAST(key AS INTEGER)", "CAST(value AS INTEGER)", "timestamp"}).as(Encoders.tuple(Encoders.INT(), Encoders.INT(), Encoders.TIMESTAMP())).map(tuple3 -> {
            return new DemoEvent((Integer) tuple3._1(), (Integer) tuple3._2(), ((Timestamp) tuple3._3()).getTime());
        }, Encoders.bean(DemoEvent.class));
        KeyedScottyWindowOperator keyedScottyWindowOperator = new KeyedScottyWindowOperator(new SumWindowFunction(), 100L);
        keyedScottyWindowOperator.addWindow(new TumblingWindow(WindowMeasure.Time, 5000L)).addWindow(new SlidingWindow(WindowMeasure.Time, 5000L, 1000L));
        Dataset flatMap = map.flatMap(keyedScottyWindowOperator, Encoders.INT());
        as.printSchema();
        map.printSchema();
        flatMap.printSchema();
        System.out.println("Streaming : " + as.isStreaming());
        new DemoKafkaProducer("testInput").start();
        flatMap.writeStream().outputMode("update").format("console").trigger(Trigger.Continuous(999L)).start().awaitTermination();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1632181032:
                if (implMethodName.equals("lambda$main$53118e68$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("stream/scotty/demo/spark/SparkSumDemo") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple3;)Lstream/scotty/sparkconnector/demo/DemoEvent;")) {
                    return tuple3 -> {
                        return new DemoEvent((Integer) tuple3._1(), (Integer) tuple3._2(), ((Timestamp) tuple3._3()).getTime());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
