countByValueAndWindow(windowLength, slideInterval, [numTasks])

countByValueAndWindow(windowLength, slideInterval, [numTasks])

When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql
7
.{Row, SaveMode, SparkSession}
8
import org.apache.spark.sql.SQLContext
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
​
17
spark.sparkContext.setCheckpointDir("/tmp/")
18
​
19
import spark.implicits._
20
val sc=spark.sparkContext
21
val ssc = new StreamingContext(sc, Seconds(1))
22
​
23
​
24
val messages1 = ssc.textFileStream("/tmp/filestream1/")
25
val messages2 = ssc.textFileStream("/tmp/filestream2/")
26
val messages3 = messages1.union(messages2)
27
val messages=messages3.filter(_.nonEmpty)
28
​
29
val countMsg=messages
30
.countByValueAndWindow(Seconds(10), Seconds(5))
31
​
32
countMsg.print()
33
​
34
ssc.start()
35
ssc.awaitTermination()
36
​
37
/*
38
Input:
39
​
40
cat cccc.txt
41
1
42
2
43
3
44
4
45
5
46
​
47
cat dddd.txt
48
1
49
2
50
3
51
4
52
5
53
​
54
cat eeee.txt
55
1
56
2
57
3
58
4
59
5
60
​
61
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
62
cp eeee.txt /tmp/filestream1
63
​
64
Output:
65
​
66
-------------------------------------------
67
Time: 1583635409000 ms
68
-------------------------------------------
69
​
70
-------------------------------------------
71
Time: 1583635414000 ms
72
-------------------------------------------
73
​
74
-------------------------------------------
75
Time: 1583635419000 ms
76
-------------------------------------------
77
​
78
-------------------------------------------
79
Time: 1583635424000 ms
80
-------------------------------------------
81
(4,2)
82
(2,2)
83
(5,2)
84
(3,2)
85
(1,2)
86
​
87
-------------------------------------------
88
Time: 1583635429000 ms
89
-------------------------------------------
90
(4,2)
91
(2,2)
92
(5,2)
93
(3,2)
94
(1,2)
95
​
96
-------------------------------------------
97
Time: 1583635434000 ms
98
-------------------------------------------
99
​
100
-------------------------------------------
101
Time: 1583635439000 ms
102
-------------------------------------------
103
(4,1)
104
(2,1)
105
(5,1)
106
(3,1)
107
(1,1)
108
​
109
-------------------------------------------
110
Time: 1583635444000 ms
111
-------------------------------------------
112
(4,1)
113
(2,1)
114
(5,1)
115
(3,1)
116
(1,1)
117
​
118
It counts the value by the key, similar to reduceByKey
119
​
120
​
121
​
122
​
123
​
124
​
125
​
126
​
127
​
128
*/
129
​
Copied!
Last modified 1yr ago
Copy link