package org.apache.spark.examples.snappydata;

import java.io.File;
import org.apache.spark.jdbc.ConnectionConf;
import org.apache.spark.jdbc.ConnectionConfBuilder;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.SnappyStreamingContext;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Random$;

/* compiled from: StreamingExample.scala */
/* loaded from: input_file:org/apache/spark/examples/snappydata/StreamingExample$.class */
public final class StreamingExample$ {
    public static final StreamingExample$ MODULE$ = null;

    static {
        new StreamingExample$();
    }

    public void main(String[] strArr) {
        String createAndGetDataDir = createAndGetDataDir();
        Predef$.MODULE$.println("Initializing a SnappyStreamingContext");
        SnappyStreamingContext snappyStreamingContext = new SnappyStreamingContext(SparkSession$.MODULE$.builder().appName(getClass().getSimpleName()).master("local[*]").config("snappydata.store.sys-disk-dir", createAndGetDataDir).config("snappydata.store.log-file", new StringBuilder().append(createAndGetDataDir).append("/SnappyDataExample.log").toString()).getOrCreate().sparkContext(), Seconds$.MODULE$.apply(1L));
        Predef$.MODULE$.println();
        Predef$.MODULE$.println("Initializing embedded Kafka");
        EmbeddedKafkaUtils embeddedKafkaUtils = new EmbeddedKafkaUtils();
        embeddedKafkaUtils.setup();
        embeddedKafkaUtils.createTopic("kafka_topic");
        String brokerAddress = embeddedKafkaUtils.brokerAddress();
        String stringBuilder = new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test-consumer-"})).s(Nil$.MODULE$)).append(BoxesRunTime.boxToInteger(Random$.MODULE$.nextInt(10000))).toString();
        Predef$.MODULE$.println();
        Predef$.MODULE$.println("Creating a stream table to read data from Kafka");
        snappyStreamingContext.sql("drop table if exists adImpressionStream");
        snappyStreamingContext.sql("drop table if exists publisher_bid_counts");
        snappyStreamingContext.sql(new StringBuilder().append("create stream table adImpressionStream ( time_stamp timestamp, publisher string, advertiser string, website string, geo string, bid double, cookie string)  using kafka_stream options( rowConverter 'org.apache.spark.examples.snappydata.RowsConverter',").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" kafkaParams 'bootstrap.servers->", ";"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{brokerAddress}))).append("key.deserializer->org.apache.kafka.common.serialization.StringDeserializer;").append("value.deserializer->org.apache.kafka.common.serialization.StringDeserializer;").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"group.id->", ";auto.offset.reset->earliest',"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{stringBuilder}))).append(" startingOffsets '{\"").append("kafka_topic").append("\":{\"0\":0}}', ").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" subscribe '", "')"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{"kafka_topic"}))).toString());
        snappyStreamingContext.sql("create table publisher_bid_counts(publisher string, bidCount int) using row");
        snappyStreamingContext.sql("insert into publisher_bid_counts values('publisher1', 0)");
        snappyStreamingContext.sql("insert into publisher_bid_counts values('publisher2', 0)");
        snappyStreamingContext.sql("insert into publisher_bid_counts values('publisher3', 0)");
        snappyStreamingContext.sql("insert into publisher_bid_counts values('publisher4', 0)");
        Predef$.MODULE$.println();
        Predef$.MODULE$.println("Registering a continuous query to to be executed every second on the stream table");
        SchemaDStream registerCQ = snappyStreamingContext.registerCQ("select publisher, count(bid) as bidCount from adImpressionStream window (duration 1 seconds, slide 1 seconds) group by publisher");
        ConnectionConf build = new ConnectionConfBuilder(snappyStreamingContext.snappySession()).build();
        Predef$.MODULE$.println();
        registerCQ.foreachDataFrame(new StreamingExample$$anonfun$main$1(build));
        snappyStreamingContext.start();
        Predef$.MODULE$.println("Publishing messages on Kafka");
        publishKafkaMessages(embeddedKafkaUtils, "kafka_topic");
        Thread.sleep(3000L);
        Predef$.MODULE$.println("***Total no of bids per publisher are***");
        snappyStreamingContext.snappySession().sql("select publisher, bidCount from publisher_bid_counts").show();
        Predef$.MODULE$.println("Exiting");
        snappyStreamingContext.stop(false);
        embeddedKafkaUtils.teardown();
        System.exit(0);
    }

    public String createAndGetDataDir() {
        new File("./snappydata_examples_data").mkdir();
        return new File("./snappydata_examples_data").getAbsolutePath();
    }

    public void publishKafkaMessages(EmbeddedKafkaUtils embeddedKafkaUtils, String str) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(new StreamingExample$$anonfun$publishKafkaMessages$1(embeddedKafkaUtils, str));
        Predef$.MODULE$.println("Done publishing all messages");
    }

    private StreamingExample$() {
        MODULE$ = this;
    }
}
