countByWindow(windowLength, slideInterval)

countByWindow(windowLength, slideInterval)

Return a sliding window count of elements in the stream.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql
7
.{Row, SaveMode, SparkSession}
8
import org.apache.spark.sql.SQLContext
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
​
17
spark.sparkContext.setCheckpointDir("/tmp/")
18
​
19
import spark.implicits._
20
val sc=spark.sparkContext
21
val ssc = new StreamingContext(sc, Seconds(1))
22
​
23
​
24
val messages1 = ssc
25
.textFileStream("/tmp/filestream1/")
26
val messages2 = ssc
27
.textFileStream("/tmp/filestream2/")
28
​
29
val messages3 = messages1.union(messages2)
30
val countMsg=messages3
31
.countByWindow(Seconds(10), Seconds(5))
32
​
33
countMsg.print()
34
​
35
ssc.start()
36
ssc.awaitTermination()
37
​
38
/*
39
input files:
40
​
41
send 2 files to /tmp/filestream1/ /tmp/filestream2/
42
​
43
cat xx.txt
44
a a b c d
45
d h i j k
46
​
47
cat yy.txt
48
a a b c d
49
d h i j k
50
​
51
cp xx.txt /tmp/filestream1;cp yy.txt /tmp/filestream2
52
​
53
output: it counts each line in file, including
54
last line which is empty, but count as 1 too
55
​
56
-------------------------------------------
57
Time: 1583610236000 ms
58
-------------------------------------------
59
6
60
​
61
-------------------------------------------
62
Time: 1583610241000 ms
63
-------------------------------------------
64
6
65
​
66
​
67
​
68
*/
69
​
Copied!
Last modified 1yr ago
Copy link