reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql
7
.{Row, SaveMode, SparkSession}
8
import org.apache.spark.sql.SQLContext
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
import spark.implicits._
17
val sc=spark.sparkContext
18
val ssc = new StreamingContext(sc, Seconds(1))
19
​
20
val messages1 = ssc.textFileStream("/tmp/filestream1/")
21
val messages2 = ssc.textFileStream("/tmp/filestream2/")
22
​
23
val messages3 = messages1.union(messages2)
24
val messages=messages3.filter(_.nonEmpty)
25
.map(x=>(x.toInt,1))
26
val msgs:
27
org.apache.spark.streaming
28
.dstream.DStream[(Int, Int)] = messages
29
​
30
val reduceFn: (Int, Int) => Int = (a,b)=> {
31
a+b
32
}
33
​
34
val windowedMsgs:
35
org.apache.spark.streaming
36
.dstream.DStream[(Int, Int)] =
37
msgs.reduceByKeyAndWindow(reduceFn, Seconds(10)
38
, Seconds(5))
39
​
40
windowedMsgs.print()
41
ssc.start()
42
ssc.awaitTermination()
43
​
44
/*
45
input:
46
cat cccc.txt
47
1
48
2
49
3
50
4
51
5
52
​
53
cat dddd.txt
54
1
55
2
56
3
57
4
58
5
59
​
60
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
61
​
62
output:
63
​
64
​
65
-------------------------------------------
66
Time: 1583633096000 ms
67
-------------------------------------------
68
​
69
-------------------------------------------
70
Time: 1583633101000 ms
71
-------------------------------------------
72
​
73
-------------------------------------------
74
Time: 1583633106000 ms
75
-------------------------------------------
76
​
77
-------------------------------------------
78
Time: 1583633111000 ms
79
-------------------------------------------
80
​
81
-------------------------------------------
82
Time: 1583633116000 ms
83
-------------------------------------------
84
​
85
-------------------------------------------
86
Time: 1583633121000 ms
87
-------------------------------------------
88
​
89
-------------------------------------------
90
Time: 1583633126000 ms
91
-------------------------------------------
92
​
93
-------------------------------------------
94
Time: 1583633131000 ms
95
-------------------------------------------
96
​
97
-------------------------------------------
98
Time: 1583633136000 ms
99
-------------------------------------------
100
(4,2)
101
(2,2)
102
(1,2)
103
(3,2)
104
(5,2)
105
​
106
-------------------------------------------
107
Time: 1583633141000 ms
108
-------------------------------------------
109
(4,2)
110
(2,2)
111
(1,2)
112
(3,2)
113
(5,2)
114
​
115
​
116
​
117
​
118
​
119
*/
Copied!
Last modified 1yr ago
Copy link