Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
Example:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql
.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming
.{Seconds, StreamingContext}
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val dataDirectory="/tmp/filestream/"
val lines=ssc.textFileStream(dataDirectory)
val nums = lines.flatMap(_.split(" "))
//commonRdd is an RDD, not a DSTREAM
val commonRdd = ssc.sparkContext
.parallelize(Array("x"))
val transformed = nums.filter(_.nonEmpty)
.transform(x=>x.union(commonRdd))
transformed.print(50)
ssc.start()
ssc.awaitTermination()
/*
Input file:
cat y.txt
a a b c d
d h i j k
output:
-------------------------------------------
Time: 1583563145000 ms
-------------------------------------------
a
a
b
c
d
d
h
i
j
k
x <== it appends "x" at the end of RDD
*/