reduceByWindow(func, windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql
7
.{Row, SaveMode, SparkSession}
8
import org.apache.spark.sql.SQLContext
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
import spark.implicits._
17
val sc=spark.sparkContext
18
val ssc = new StreamingContext(sc, Seconds(1))
19
​
20
val messages1 = ssc
21
.textFileStream("/tmp/filestream1/")
22
val messages2 = ssc
23
.textFileStream("/tmp/filestream2/")
24
​
25
val messages3 = messages1.union(messages2)
26
val messages=messages3.filter(_.nonEmpty)
27
.map(x=>(x,1))
28
val msgs: org.apache.spark.streaming.dstream
29
.DStream[(String, Int)] = messages
30
​
31
type T = (String, Int)
32
val reduceFn: (T, T) => T = {
33
case x @ ((k1, v1), (k2, v2)) =>
34
println(s">>> input: $x")
35
//(k2, s"$v1 + $v2")
36
(k2, v1 + v2)
37
}
38
​
39
val windowedMsgs: org.apache.spark.streaming
40
.dstream.DStream[(String, Int)] =
41
msgs.reduceByWindow
42
(reduceFn, Seconds(10), Seconds(5))
43
​
44
windowedMsgs.print()
45
ssc.start()
46
ssc.awaitTermination()
47
​
48
/*
49
Input files:
50
​
51
cat ccc.txt
52
1
53
2
54
3
55
4
56
5
57
​
58
cat ddd.txt
59
6
60
7
61
8
62
9
63
10
64
​
65
Output:
66
​
67
-------------------------------------------
68
Time: 1583622436000 ms
69
-------------------------------------------
70
​
71
>>> input: ((6,1),(7,1))
72
>>> input: ((7,2),(8,1))
73
>>> input: ((8,3),(9,1))
74
>>> input: ((9,4),(10,1))
75
>>> input: ((1,1),(2,1))
76
>>> input: ((2,2),(3,1))
77
>>> input: ((3,3),(4,1))
78
>>> input: ((4,4),(5,1))
79
>>> input: ((5,5),(10,5))
80
-------------------------------------------
81
Time: 1583622441000 ms
82
-------------------------------------------
83
(10,10)
84
​
85
It tells you there are 10 keys, total number of distinct key is 10
86
therefore (10,10)
87
​
88
​
89
​
90
​
91
​
92
*/
93
​
94
​
Copied!
Last modified 1yr ago
Copy link