reduceByWindow(func, windowLength, slideInterval)

reduceByWindow(func, windowLength, slideInterval)

Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql
  .{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))

val messages1 = ssc
  .textFileStream("/tmp/filestream1/")
val messages2 = ssc
  .textFileStream("/tmp/filestream2/")

val messages3 = messages1.union(messages2)
val messages=messages3.filter(_.nonEmpty)
  .map(x=>(x,1))
val msgs: org.apache.spark.streaming.dstream
  .DStream[(String, Int)] = messages

type T = (String, Int)
val reduceFn: (T, T) => T = {
case x @ ((k1, v1), (k2, v2)) =>
println(s">>> input: $x")
//(k2, s"$v1 + $v2")
(k2, v1 + v2)
}

val windowedMsgs: org.apache.spark.streaming
  .dstream.DStream[(String, Int)] =
msgs.reduceByWindow
  (reduceFn, Seconds(10), Seconds(5))

windowedMsgs.print()
ssc.start()
ssc.awaitTermination()

/*
Input files:

cat ccc.txt
1
2
3
4
5

cat ddd.txt
6
7
8
9
10

Output:

-------------------------------------------
Time: 1583622436000 ms
-------------------------------------------

>>> input: ((6,1),(7,1))
>>> input: ((7,2),(8,1))
>>> input: ((8,3),(9,1))
>>> input: ((9,4),(10,1))
>>> input: ((1,1),(2,1))
>>> input: ((2,2),(3,1))
>>> input: ((3,3),(4,1))
>>> input: ((4,4),(5,1))
>>> input: ((5,5),(10,5))
-------------------------------------------
Time: 1583622441000 ms
-------------------------------------------
(10,10)

It tells you there are 10 keys, total number of distinct key is 10
therefore (10,10)





*/

Last updated