countByValue()

countByValue()

When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
​
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.sql
4
.{Row, SaveMode, SparkSession}
5
import org.apache.spark.sql.SQLContext
6
import org.apache.log4j.{Level, Logger}
7
import org.apache.spark.streaming
8
.{Seconds, StreamingContext}
9
​
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
val spark = SparkSession
12
.builder()
13
.config("spark.master", "local[2]")
14
.appName("streaming for book")
15
.getOrCreate()
16
import spark.implicits._
17
val sc=spark.sparkContext
18
val ssc = new StreamingContext(sc, Seconds(1))
19
​
20
val dataDirectory="/tmp/filestream/"
21
val lines=ssc.textFileStream(dataDirectory)
22
​
23
val keys = lines.flatMap(_.split(" "))
24
.filter(_.nonEmpty)
25
​
26
val countKey=keys.countByValue()
27
​
28
countKey.print()
29
​
30
ssc.start()
31
ssc.awaitTermination()
32
​
33
/*
34
input file:
35
​
36
cat x.txt
37
a a b c d
38
d h i j k
39
​
40
​
41
output:
42
​
43
-------------------------------------------
44
Time: 1583560444000 ms
45
-------------------------------------------
46
(d,2)
47
(b,1)
48
(h,1)
49
(j,1)
50
(a,2)
51
(i,1)
52
(k,1)
53
(c,1)
54
​
55
*/
Copied!
Last modified 1yr ago
Copy link