reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

This method overloads the same name previous reduceByKeyAndWindow with one additional invFunc, or invert function. It is needed to subtract the statistics accumulated from previous window.
A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
Note: similar to updateStateByKey, to main state for such as running total
Example:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
​
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
​
spark.sparkContext.setCheckpointDir("/tmp/")
​
val messages1 = ssc.textFileStream("/tmp/filestream1/")
val messages2 = ssc.textFileStream("/tmp/filestream2/")
​
val messages3 = messages1.union(messages2)
val messages=messages3.filter(_.nonEmpty).map(x=>(x.toInt,1))
val msgs: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = messages
​
val reduceFn: (Int, Int) => Int = (a,b)=> {
a+b
}
​
val InvFn: (Int, Int) => Int = (a,b)=> {
a-b
}
​
val windowedMsgs:
org.apache.spark.streaming.dstream.DStream[(Int, Int)] =
msgs.reduceByKeyAndWindow(reduceFn, InvFn, Seconds(10), Seconds(5))
​
windowedMsgs.print()
ssc.start()
ssc.awaitTermination()
​
/*
input:
cat cccc.txt
1
2
3
4
5
​
cat dddd.txt
1
2
3
4
5
​
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
​
Wait a while
​
cat eeee.txt
1
2
3
4
5
​
cp eeee.txt /tmp/filestream1
​
​
​
output:
​
​
-------------------------------------------
Time: 1583633728000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633733000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633738000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633743000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633748000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633753000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633758000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633763000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633768000 ms
-------------------------------------------
​
-------------------------------------------
Time: 1583633773000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
​
-------------------------------------------
Time: 1583633778000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
​
-------------------------------------------
Time: 1583633783000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
​
-------------------------------------------
Time: 1583633788000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
​
-------------------------------------------
Time: 1583633793000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
​
-------------------------------------------
Time: 1583633798000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
​
-------------------------------------------
Time: 1583633803000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
​
-------------------------------------------
Time: 1583633808000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
​
​
​
*/