countByValue()

countByValue()

When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql
 .{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming
  .{Seconds, StreamingContext}

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))

val dataDirectory="/tmp/filestream/"
val lines=ssc.textFileStream(dataDirectory)

val keys = lines.flatMap(_.split(" "))
  .filter(_.nonEmpty)

val countKey=keys.countByValue()

countKey.print()

ssc.start()
ssc.awaitTermination()

/*
input file:

 cat x.txt
a a b c d
d h i j k


output:

-------------------------------------------
Time: 1583560444000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)

*/

Last updated