package io.snappydata.examples;

import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.JSparkJobValid;
import org.apache.spark.sql.JSparkJobValidation;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.JavaSnappyStreamingJob;
import org.apache.spark.streaming.api.java.JavaSnappyStreamingContext;

/* loaded from: input_file:io/snappydata/examples/JavaTwitterPopularTagsJob.class */
public class JavaTwitterPopularTagsJob extends JavaSnappyStreamingJob {
    public Object runJavaJob(JavaSnappyStreamingContext javaSnappyStreamingContext, Config config) {
        PrintWriter printWriter = null;
        String str = null;
        String str2 = null;
        try {
            str = new File(".").getCanonicalPath();
            str2 = String.format("JavaTwitterPopularTagsJob-%d.out", Long.valueOf(System.currentTimeMillis()));
            printWriter = new PrintWriter(str2);
            StructType structType = new StructType(new StructField[]{DataTypes.createStructField("hashtag", DataTypes.StringType, false)});
            javaSnappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS topktable");
            javaSnappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS hashtagtable");
            javaSnappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS retweettable");
            if (config.hasPath("consumerKey") && config.hasPath("consumerKey") && config.hasPath("accessToken") && config.hasPath("accessTokenSecret")) {
                printWriter.println("##### Running example with live twitter stream #####");
                String string = config.getString("consumerKey");
                String string2 = config.getString("consumerSecret");
                String string3 = config.getString("accessToken");
                String string4 = config.getString("accessTokenSecret");
                javaSnappyStreamingContext.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", string) + String.format("consumerSecret '%s',", string2) + String.format("accessToken '%s',", string3) + String.format("accessTokenSecret '%s',", string4) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToHashtagRow") + ")");
                javaSnappyStreamingContext.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", string) + String.format("consumerSecret '%s',", string2) + String.format("accessToken '%s',", string3) + String.format("accessTokenSecret '%s',", string4) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToRetweetRow") + ")");
            } else {
                printWriter.println("##### Running example with stored tweet data #####");
                javaSnappyStreamingContext.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow',directory '/tmp/copiedtwitterdata')");
                javaSnappyStreamingContext.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow',directory '/tmp/copiedtwitterdata')");
            }
            SchemaDStream registerCQ = javaSnappyStreamingContext.registerCQ("SELECT * FROM retweettable WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)");
            HashMap hashMap = new HashMap();
            hashMap.put("epoch", new Long(System.currentTimeMillis()).toString());
            hashMap.put("timeInterval", "2000ms");
            hashMap.put("size", "10");
            hashMap.put("basetable", "hashtagtable");
            javaSnappyStreamingContext.snappyContext().createApproxTSTopK("topktable", "hashtag", structType, hashMap, false);
            javaSnappyStreamingContext.snappyContext().dropTable("retweetStore", true);
            javaSnappyStreamingContext.snappyContext().sql(String.format("CREATE TABLE %s (retweetId BIGINT PRIMARY KEY, retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()", "retweetStore"));
            registerCQ.foreachDataFrame(new VoidFunction<DataFrame>() { // from class: io.snappydata.examples.JavaTwitterPopularTagsJob.1
                public void call(DataFrame dataFrame) {
                    dataFrame.write().mode(SaveMode.Append).saveAsTable("retweetStore");
                }
            });
            javaSnappyStreamingContext.start();
            long currentTimeMillis = System.currentTimeMillis() + (config.hasPath("streamRunTime") ? Integer.parseInt(config.getString("streamRunTime")) * 1000 : 120000);
            while (currentTimeMillis > System.currentTimeMillis()) {
                Thread.sleep(2000L);
                printWriter.println("\n******** Top 10 hash tags of last two seconds *******\n");
                printResult(javaSnappyStreamingContext.snappyContext().queryApproxTSTopK("topktable", System.currentTimeMillis() - 2000, System.currentTimeMillis()).collect(), printWriter);
            }
            printResult(javaSnappyStreamingContext.sql("SELECT * FROM topktable").collect(), printWriter);
            printWriter.println("\n####### Top 10 popular tweets - Query Row table #######\n");
            printResult(javaSnappyStreamingContext.snappyContext().sql("SELECT retweetId AS RetweetId, retweetCnt AS RetweetsCount, retweetTxt AS Text FROM retweetStore ORDER BY RetweetsCount DESC LIMIT 10").collect(), printWriter);
            printWriter.println("\n#######################################################");
            printWriter.close();
            javaSnappyStreamingContext.stop(false, true);
        } catch (IOException e) {
            printWriter.close();
            javaSnappyStreamingContext.stop(false, true);
        } catch (InterruptedException e2) {
            printWriter.close();
            javaSnappyStreamingContext.stop(false, true);
        } catch (Throwable th) {
            printWriter.close();
            javaSnappyStreamingContext.stop(false, true);
            throw th;
        }
        return "See " + str + "/" + str2;
    }

    private void printResult(Row[] rowArr, PrintWriter printWriter) {
        for (Row row : rowArr) {
            printWriter.println(row.toString());
        }
    }

    public JSparkJobValidation isValidJob(JavaSnappyStreamingContext javaSnappyStreamingContext, Config config) {
        return new JSparkJobValid();
    }
}
