transform(func)

transform(func)

Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.sql
4
.{Row, SaveMode, SparkSession}
5
import org.apache.spark.sql.SQLContext
6
import org.apache.log4j.{Level, Logger}
7
import org.apache.spark.streaming
8
.{Seconds, StreamingContext}
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
import spark.implicits._
17
val sc=spark.sparkContext
18
val ssc = new StreamingContext(sc, Seconds(1))
19
​
20
val dataDirectory="/tmp/filestream/"
21
val lines=ssc.textFileStream(dataDirectory)
22
​
23
val nums = lines.flatMap(_.split(" "))
24
​
25
//commonRdd is an RDD, not a DSTREAM
26
val commonRdd = ssc.sparkContext
27
.parallelize(Array("x"))
28
val transformed = nums.filter(_.nonEmpty)
29
.transform(x=>x.union(commonRdd))
30
​
31
transformed.print(50)
32
ssc.start()
33
ssc.awaitTermination()
34
​
35
/*
36
Input file:
37
​
38
cat y.txt
39
a a b c d
40
d h i j k
41
​
42
output:
43
​
44
-------------------------------------------
45
Time: 1583563145000 ms
46
-------------------------------------------
47
a
48
a
49
b
50
c
51
d
52
d
53
h
54
i
55
j
56
k
57
x <== it appends "x" at the end of RDD
58
​
59
*/
60
​
Copied!
Last modified 1yr ago
Copy link