foreachRDD(func)
foreachRDD(func)
val focus_tweets_collection=tweets_collection.filter(text=>text.toLowerCase.contains("happy") | text.toLowerCase.contains("money"))
//iterate through DSTREAM for each RDD, show it and store tweets streaming
// to HIVE table, this is an example of as part of ETL to harvest and store
// streaming data from Twitter
focus_tweets_collection.foreachRDD{ rdd =>
if(!rdd.isEmpty) {
val tweetsDataFrame = rdd.toDF("newTweet")
tweetsDataFrame.show(false)
// tweetsDataFrame.printSchema()
// println(tweetsDataFrame.count())
//Create in memory temp view newTweets
tweetsDataFrame.createOrReplaceTempView("newTweets")
// Inserts the new tweets received to HIVE table tweets, with
// current datetimestamp
spark.sql("insert into tweets select from_unixtime(unix_timestamp()), * from newTweets")
}
}PrevioussaveAsHadoopFiles(prefix, [suffix])NextBuild Twitter Scala API Library for Spark Streaming using sbt
Last updated