foreachRDD(func)

foreachRDD(func)

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
Example:
1
val focus_tweets_collection=tweets_collection.filter(text=>text.toLowerCase.contains("happy") | text.toLowerCase.contains("money"))
2
3
//iterate through DSTREAM for each RDD, show it and store tweets streaming
4
// to HIVE table, this is an example of as part of ETL to harvest and store
5
// streaming data from Twitter
6
focus_tweets_collection.foreachRDD{ rdd =>
7
if(!rdd.isEmpty) {
8
val tweetsDataFrame = rdd.toDF("newTweet")
9
tweetsDataFrame.show(false)
10
// tweetsDataFrame.printSchema()
11
// println(tweetsDataFrame.count())
12
//Create in memory temp view newTweets
13
tweetsDataFrame.createOrReplaceTempView("newTweets")
14
// Inserts the new tweets received to HIVE table tweets, with
15
// current datetimestamp
16
spark.sql("insert into tweets select from_unixtime(unix_timestamp()), * from newTweets")
17
​
18
}
19
}
Copied!
Last modified 1yr ago
Copy link