window(windowLength, slideInterval)

window(windowLength, slideInterval)

Return a new DStream which is computed based on windowed batches of the source DStream.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
7
import org.apache.spark.sql.SQLContext
8
​
9
Logger.getLogger("org").setLevel(Level.ERROR)
10
val spark = SparkSession
11
.builder()
12
.config("spark.master", "local[2]")
13
.appName("streaming for book")
14
.getOrCreate()
15
​
16
spark.sparkContext.setCheckpointDir("/tmp/")
17
​
18
import spark.implicits._
19
val sc=spark.sparkContext
20
val ssc = new StreamingContext(sc, Seconds(1))
21
​
22
​
23
val messages1 = ssc.textFileStream("/tmp/filestream1/")
24
val messages2 = ssc.textFileStream("/tmp/filestream2/")
25
val messages3 = messages1.union(messages2)
26
val messages=messages3.filter(_.nonEmpty)
27
​
28
val windowMsg=messages.window(Seconds(10), Seconds(5))
29
​
30
windowMsg.print()
31
​
32
ssc.start()
33
ssc.awaitTermination()
34
​
35
/* Input:
36
​
37
cat cccc.txt
38
1
39
2
40
3
41
4
42
5
43
​
44
cat dddd.txt
45
1
46
2
47
3
48
4
49
5
50
​
51
cat eeee.txt
52
1
53
2
54
3
55
4
56
5
57
​
58
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
59
​
60
wait a while
61
​
62
cp eeee.txt /tmp/filestream1
63
​
64
Output:
65
​
66
-------------------------------------------
67
Time: 1583635894000 ms
68
-------------------------------------------
69
​
70
-------------------------------------------
71
Time: 1583635899000 ms
72
-------------------------------------------
73
​
74
-------------------------------------------
75
Time: 1583635904000 ms
76
-------------------------------------------
77
​
78
-------------------------------------------
79
Time: 1583635909000 ms
80
-------------------------------------------
81
​
82
-------------------------------------------
83
Time: 1583635914000 ms
84
-------------------------------------------
85
1
86
2
87
3
88
4
89
5
90
1
91
2
92
3
93
4
94
5
95
​
96
-------------------------------------------
97
Time: 1583635919000 ms
98
-------------------------------------------
99
1
100
2
101
3
102
4
103
5
104
1
105
2
106
3
107
4
108
5
109
​
110
-------------------------------------------
111
Time: 1583635924000 ms
112
-------------------------------------------
113
​
114
-------------------------------------------
115
Time: 1583635929000 ms
116
-------------------------------------------
117
​
118
-------------------------------------------
119
Time: 1583635934000 ms
120
-------------------------------------------
121
1
122
2
123
3
124
4
125
5
126
​
127
-------------------------------------------
128
Time: 1583635939000 ms
129
-------------------------------------------
130
1
131
2
132
3
133
4
134
5
135
​
136
*/
Copied!
Last modified 1yr ago
Copy link