reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

This method overloads the same name previous reduceByKeyAndWindow with one additional invFunc, or invert function. It is needed to subtract the statistics accumulated from previous window.
A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and โ€œinverse reducingโ€ the old data that leaves the window. An example would be that of โ€œaddingโ€ and โ€œsubtractingโ€ counts of keys as the window slides. However, it is applicable only to โ€œinvertible reduce functionsโ€, that is, those reduce functions which have a corresponding โ€œinverse reduceโ€ function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
Note: similar to updateStateByKey, to main state for such as running total
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
7
import org.apache.spark.sql.SQLContext
8
โ€‹
9
Logger.getLogger("org").setLevel(Level.ERROR)
10
val spark = SparkSession
11
.builder()
12
.config("spark.master", "local[2]")
13
.appName("streaming for book")
14
.getOrCreate()
15
import spark.implicits._
16
val sc=spark.sparkContext
17
val ssc = new StreamingContext(sc, Seconds(1))
18
โ€‹
19
spark.sparkContext.setCheckpointDir("/tmp/")
20
โ€‹
21
val messages1 = ssc.textFileStream("/tmp/filestream1/")
22
val messages2 = ssc.textFileStream("/tmp/filestream2/")
23
โ€‹
24
val messages3 = messages1.union(messages2)
25
val messages=messages3.filter(_.nonEmpty).map(x=>(x.toInt,1))
26
val msgs: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = messages
27
โ€‹
28
val reduceFn: (Int, Int) => Int = (a,b)=> {
29
a+b
30
}
31
โ€‹
32
val InvFn: (Int, Int) => Int = (a,b)=> {
33
a-b
34
}
35
โ€‹
36
val windowedMsgs:
37
org.apache.spark.streaming.dstream.DStream[(Int, Int)] =
38
msgs.reduceByKeyAndWindow(reduceFn, InvFn, Seconds(10), Seconds(5))
39
โ€‹
40
windowedMsgs.print()
41
ssc.start()
42
ssc.awaitTermination()
43
โ€‹
44
/*
45
input:
46
cat cccc.txt
47
1
48
2
49
3
50
4
51
5
52
โ€‹
53
cat dddd.txt
54
1
55
2
56
3
57
4
58
5
59
โ€‹
60
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
61
โ€‹
62
Wait a while
63
โ€‹
64
cat eeee.txt
65
1
66
2
67
3
68
4
69
5
70
โ€‹
71
cp eeee.txt /tmp/filestream1
72
โ€‹
73
โ€‹
74
โ€‹
75
output:
76
โ€‹
77
โ€‹
78
-------------------------------------------
79
Time: 1583633728000 ms
80
-------------------------------------------
81
โ€‹
82
-------------------------------------------
83
Time: 1583633733000 ms
84
-------------------------------------------
85
โ€‹
86
-------------------------------------------
87
Time: 1583633738000 ms
88
-------------------------------------------
89
โ€‹
90
-------------------------------------------
91
Time: 1583633743000 ms
92
-------------------------------------------
93
โ€‹
94
-------------------------------------------
95
Time: 1583633748000 ms
96
-------------------------------------------
97
โ€‹
98
-------------------------------------------
99
Time: 1583633753000 ms
100
-------------------------------------------
101
โ€‹
102
-------------------------------------------
103
Time: 1583633758000 ms
104
-------------------------------------------
105
โ€‹
106
-------------------------------------------
107
Time: 1583633763000 ms
108
-------------------------------------------
109
โ€‹
110
-------------------------------------------
111
Time: 1583633768000 ms
112
-------------------------------------------
113
โ€‹
114
-------------------------------------------
115
Time: 1583633773000 ms
116
-------------------------------------------
117
(4,1)
118
(2,1)
119
(1,1)
120
(3,1)
121
(5,1)
122
โ€‹
123
-------------------------------------------
124
Time: 1583633778000 ms
125
-------------------------------------------
126
(4,1)
127
(2,1)
128
(1,1)
129
(3,1)
130
(5,1)
131
โ€‹
132
-------------------------------------------
133
Time: 1583633783000 ms
134
-------------------------------------------
135
(4,0)
136
(2,0)
137
(1,0)
138
(3,0)
139
(5,0)
140
โ€‹
141
-------------------------------------------
142
Time: 1583633788000 ms
143
-------------------------------------------
144
(4,0)
145
(2,0)
146
(1,0)
147
(3,0)
148
(5,0)
149
โ€‹
150
-------------------------------------------
151
Time: 1583633793000 ms
152
-------------------------------------------
153
(4,0)
154
(2,0)
155
(1,0)
156
(3,0)
157
(5,0)
158
โ€‹
159
-------------------------------------------
160
Time: 1583633798000 ms
161
-------------------------------------------
162
(4,1)
163
(2,1)
164
(1,1)
165
(3,1)
166
(5,1)
167
โ€‹
168
-------------------------------------------
169
Time: 1583633803000 ms
170
-------------------------------------------
171
(4,1)
172
(2,1)
173
(1,1)
174
(3,1)
175
(5,1)
176
โ€‹
177
-------------------------------------------
178
Time: 1583633808000 ms
179
-------------------------------------------
180
(4,0)
181
(2,0)
182
(1,0)
183
(3,0)
184
(5,0)
185
โ€‹
186
โ€‹
187
โ€‹
188
*/
Copied!
Last modified 1yr ago
Copy link