transform(func)

transform(func)

Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql
  .{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming
  .{Seconds, StreamingContext}

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))

val dataDirectory="/tmp/filestream/"
val lines=ssc.textFileStream(dataDirectory)

val nums = lines.flatMap(_.split(" "))

//commonRdd is an RDD, not a DSTREAM
val commonRdd = ssc.sparkContext
   .parallelize(Array("x"))
val transformed = nums.filter(_.nonEmpty)
   .transform(x=>x.union(commonRdd))

transformed.print(50)
ssc.start()
ssc.awaitTermination()

/*
Input file:

cat y.txt
a a b c d
d h i j k

output:

-------------------------------------------
Time: 1583563145000 ms
-------------------------------------------
a
a
b
c
d
d
h
i
j
k
x  <== it appends "x" at the end of RDD

*/

Last updated