countByValueAndWindow(windowLength, slideInterval, [numTasks])

countByValueAndWindow(windowLength, slideInterval, [numTasks])

When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql
 .{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()

spark.sparkContext.setCheckpointDir("/tmp/")

import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))


val messages1 = ssc.textFileStream("/tmp/filestream1/")
val messages2 = ssc.textFileStream("/tmp/filestream2/")
val messages3 = messages1.union(messages2)
val messages=messages3.filter(_.nonEmpty)

val countMsg=messages
.countByValueAndWindow(Seconds(10), Seconds(5))

countMsg.print()

ssc.start()
ssc.awaitTermination()

/*
Input:

cat cccc.txt
1
2
3
4
5

cat dddd.txt
1
2
3
4
5

cat eeee.txt
1
2
3
4
5

cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
cp eeee.txt /tmp/filestream1

Output:

-------------------------------------------
Time: 1583635409000 ms
-------------------------------------------

-------------------------------------------
Time: 1583635414000 ms
-------------------------------------------

-------------------------------------------
Time: 1583635419000 ms
-------------------------------------------

-------------------------------------------
Time: 1583635424000 ms
-------------------------------------------
(4,2)
(2,2)
(5,2)
(3,2)
(1,2)

-------------------------------------------
Time: 1583635429000 ms
-------------------------------------------
(4,2)
(2,2)
(5,2)
(3,2)
(1,2)

-------------------------------------------
Time: 1583635434000 ms
-------------------------------------------

-------------------------------------------
Time: 1583635439000 ms
-------------------------------------------
(4,1)
(2,1)
(5,1)
(3,1)
(1,1)

-------------------------------------------
Time: 1583635444000 ms
-------------------------------------------
(4,1)
(2,1)
(5,1)
(3,1)
(1,1)

It counts the value by the key, similar to reduceByKey









*/

Last updated