updateStateByKey(func)

updateStateByKey(func)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
To continually and cumulatively record statistics through streaming life time, in simple example, such as calculate word count from begining of streaming, previous state of RDD in the DSTREAM must be maintained and included in the computation of statistics, such as word count, that is what updateStateByKey(func) is for.
Example:
Background:
This example is textFileStream and each line in a file contains words delimited by blank space
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
​
7
class wordCountClass extends Serializable {
8
def flatMap(lines: org.apache.spark
9
.streaming.dstream.DStream[String])
10
: org.apache.spark.streaming.dstream
11
.DStream[String] = {
12
val results=lines.flatMap(_.split(" "))
13
results
14
}
15
​
16
def map(words: org.apache.spark.streaming.dstream
17
.DStream[String]): org.apache.spark.streaming
18
.dstream.DStream[(String, Int)] = {
19
val wordPair=words.map(x=>(x,1))
20
wordPair
21
}
22
​
23
def reduceByKey(wordPair: org.apache.spark
24
.streaming.dstream.DStream[(String, Int)]):
25
org.apache.spark.streaming.dstream
26
.DStream[(String, Int)]={
27
val wordCounts=wordPair.reduceByKey((a,b)=>(a+b))
28
wordCounts
29
}
30
}
31
​
32
val updateFunc =
33
(values: Seq[Int],state: Option[Int])=>{
34
val currentCount = values.foldLeft(0)(_+_)
35
val previousCount = state.getOrElse(0)
36
Some(currentCount + previousCount)
37
}
38
​
39
val sparkConf = new SparkConf()
40
.setAppName("textFileStreamWordCount")
41
sc.stop()
42
val ssc = new StreamingContext(sparkConf, Seconds(2))
43
ssc.checkpoint("/tmp/")
44
val lines = ssc.textFileStream("/tmp/filestream")
45
val wordCountObject = new wordCountClass()
46
val words=wordCountObject.flatMap(lines)
47
val wordMap=wordCountObject.map(words)
48
val wordCounts = wordCountObject
49
.reduceByKey(wordMap)
50
val updateWordCounts=wordCounts
51
.updateStateByKey(updateFunc)
52
​
53
//wordCounts.print()
54
updateWordCounts.print()
55
ssc.start()
56
ssc.awaitTermination()
57
​
58
/*
59
input file:
60
cat x.txt
61
a a b c d
62
d h i j k
63
​
64
Output:
65
​
66
-------------------------------------------
67
Time: 1583602836000 ms
68
-------------------------------------------
69
(d,2)
70
(b,1)
71
(h,1)
72
(,1)
73
(j,1)
74
(a,2)
75
(i,1)
76
(k,1)
77
(c,1)
78
​
79
-------------------------------------------
80
Time: 1583602838000 ms
81
-------------------------------------------
82
(d,2)
83
(b,1)
84
(h,1)
85
(,1)
86
(j,1)
87
(a,2)
88
(i,1)
89
(k,1)
90
(c,1)
91
​
92
-------------------------------------------
93
Time: 1583602840000 ms
94
-------------------------------------------
95
(d,2)
96
(b,1)
97
(h,1)
98
(,1)
99
(j,1)
100
(a,2)
101
(i,1)
102
(k,1)
103
(c,1)
104
...
105
*/
Copied!
Last modified 1yr ago
Copy link