countByWindow(windowLength, slideInterval)

countByWindow(windowLength, slideInterval)

Return a sliding window count of elements in the stream.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql
  .{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()

spark.sparkContext.setCheckpointDir("/tmp/")

import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))


val messages1 = ssc
  .textFileStream("/tmp/filestream1/")
val messages2 = ssc
  .textFileStream("/tmp/filestream2/")

val messages3 = messages1.union(messages2)
val countMsg=messages3
  .countByWindow(Seconds(10), Seconds(5))

countMsg.print()

ssc.start()
ssc.awaitTermination()

/*
input files:

send 2 files to /tmp/filestream1/ /tmp/filestream2/

cat xx.txt
a a b c d
d h i j k

cat yy.txt
a a b c d
d h i j k

cp xx.txt /tmp/filestream1;cp yy.txt /tmp/filestream2

output: it counts each line in file, including
last line which is empty, but count as 1 too

-------------------------------------------
Time: 1583610236000 ms
-------------------------------------------
6

-------------------------------------------
Time: 1583610241000 ms
-------------------------------------------
6



*/

Last updated