package io.snappydata.examples;

import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SnappyJobValid;
import org.apache.spark.sql.SnappyJobValidation;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.JavaSnappyStreamingJob;
import org.apache.spark.streaming.api.java.JavaSnappyStreamingContext;

/* loaded from: input_file:io/snappydata/examples/JavaTwitterPopularTagsJob.class */
public class JavaTwitterPopularTagsJob extends JavaSnappyStreamingJob {
    public Object runSnappyJob(JavaSnappyStreamingContext javaSnappyStreamingContext, Config config) {
        String format = String.format("JavaTwitterPopularTagsJob-%d.out", Long.valueOf(System.currentTimeMillis()));
        try {
            try {
                PrintWriter printWriter = new PrintWriter(format);
                Throwable th = null;
                try {
                    try {
                        String canonicalPath = new File(".").getCanonicalPath();
                        StructType structType = new StructType(new StructField[]{DataTypes.createStructField("hashtag", DataTypes.StringType, false)});
                        javaSnappyStreamingContext.snappySession().sql("DROP TABLE IF EXISTS topktable");
                        javaSnappyStreamingContext.snappySession().sql("DROP TABLE IF EXISTS hashtagtable");
                        javaSnappyStreamingContext.snappySession().sql("DROP TABLE IF EXISTS retweettable");
                        if (config.hasPath("consumerKey") && config.hasPath("consumerKey") && config.hasPath("accessToken") && config.hasPath("accessTokenSecret")) {
                            printWriter.println("##### Running example with live twitter stream #####");
                            String string = config.getString("consumerKey");
                            String string2 = config.getString("consumerSecret");
                            String string3 = config.getString("accessToken");
                            String string4 = config.getString("accessTokenSecret");
                            javaSnappyStreamingContext.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", string) + String.format("consumerSecret '%s',", string2) + String.format("accessToken '%s',", string3) + String.format("accessTokenSecret '%s',", string4) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToHashtagRow") + ")");
                            javaSnappyStreamingContext.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", string) + String.format("consumerSecret '%s',", string2) + String.format("accessToken '%s',", string3) + String.format("accessTokenSecret '%s',", string4) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToRetweetRow") + ")");
                        } else {
                            printWriter.println("##### Running example with stored tweet data #####");
                            javaSnappyStreamingContext.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow',directory '/tmp/copiedtwitterdata')");
                            javaSnappyStreamingContext.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow',directory '/tmp/copiedtwitterdata')");
                        }
                        SchemaDStream registerCQ = javaSnappyStreamingContext.registerCQ("SELECT * FROM retweettable WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)");
                        HashMap hashMap = new HashMap();
                        hashMap.put("epoch", Long.toString(System.currentTimeMillis()));
                        hashMap.put("timeInterval", "2000ms");
                        hashMap.put("size", "10");
                        javaSnappyStreamingContext.snappySession().createApproxTSTopK("topktable", "hashtagtable", "hashtag", structType, hashMap, false);
                        javaSnappyStreamingContext.snappySession().dropTable("retweetStore", true);
                        javaSnappyStreamingContext.snappySession().sql(String.format("CREATE TABLE %s (retweetId BIGINT PRIMARY KEY, retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()", "retweetStore"));
                        registerCQ.foreachDataFrame(new VoidFunction<Dataset<Row>>() { // from class: io.snappydata.examples.JavaTwitterPopularTagsJob.1
                            public void call(Dataset<Row> dataset) {
                                dataset.write().insertInto("retweetStore");
                            }
                        });
                        javaSnappyStreamingContext.start();
                        long currentTimeMillis = System.currentTimeMillis() + (config.hasPath("streamRunTime") ? Integer.parseInt(config.getString("streamRunTime")) * 1000 : 120000);
                        while (currentTimeMillis > System.currentTimeMillis()) {
                            Thread.sleep(2000L);
                            printWriter.println("\n******** Top 10 hash tags of last two seconds *******\n");
                            printResult(javaSnappyStreamingContext.snappySession().queryApproxTSTopK("topktable", System.currentTimeMillis() - 2000, System.currentTimeMillis(), -1).collectAsList(), printWriter);
                        }
                        printResult(javaSnappyStreamingContext.sql("SELECT * FROM topktable").collectAsList(), printWriter);
                        printWriter.println("\n####### Top 10 popular tweets - Query Row table #######\n");
                        printResult(javaSnappyStreamingContext.snappySession().sql("SELECT retweetId AS RetweetId, retweetCnt AS RetweetsCount, retweetTxt AS Text FROM retweetStore ORDER BY RetweetsCount DESC LIMIT 10").collectAsList(), printWriter);
                        printWriter.println("\n#######################################################");
                        if (printWriter != null) {
                            if (0 != 0) {
                                try {
                                    printWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                printWriter.close();
                            }
                        }
                        javaSnappyStreamingContext.stop(false, true);
                        return "See " + canonicalPath + "/" + format;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (printWriter != null) {
                        if (th != null) {
                            try {
                                printWriter.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    throw th3;
                }
            } catch (IOException | InterruptedException e) {
                PrintWriter printWriter2 = new PrintWriter(new StringWriter());
                printWriter2.println("ERROR: failed with " + e);
                e.printStackTrace(printWriter2);
                String obj = printWriter2.toString();
                javaSnappyStreamingContext.stop(false, true);
                return obj;
            }
        } catch (Throwable th5) {
            javaSnappyStreamingContext.stop(false, true);
            throw th5;
        }
    }

    private void printResult(List<Row> list, PrintWriter printWriter) {
        Iterator<Row> it = list.iterator();
        while (it.hasNext()) {
            printWriter.println(it.next().toString());
        }
    }

    public SnappyJobValidation isValidJob(JavaSnappyStreamingContext javaSnappyStreamingContext, Config config) {
        return new SnappyJobValid();
    }
}
