reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
spark.sparkContext.setCheckpointDir("/tmp/")
val messages1 = ssc.textFileStream("/tmp/filestream1/")
val messages2 = ssc.textFileStream("/tmp/filestream2/")
val messages3 = messages1.union(messages2)
val messages=messages3.filter(_.nonEmpty).map(x=>(x.toInt,1))
val msgs: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = messages
val reduceFn: (Int, Int) => Int = (a,b)=> {
a+b
}
val InvFn: (Int, Int) => Int = (a,b)=> {
a-b
}
val windowedMsgs:
org.apache.spark.streaming.dstream.DStream[(Int, Int)] =
msgs.reduceByKeyAndWindow(reduceFn, InvFn, Seconds(10), Seconds(5))
windowedMsgs.print()
ssc.start()
ssc.awaitTermination()
/*
input:
cat cccc.txt
1
2
3
4
5
cat dddd.txt
1
2
3
4
5
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
Wait a while
cat eeee.txt
1
2
3
4
5
cp eeee.txt /tmp/filestream1
output:
-------------------------------------------
Time: 1583633728000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633733000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633738000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633743000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633748000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633753000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633758000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633763000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633768000 ms
-------------------------------------------
-------------------------------------------
Time: 1583633773000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
-------------------------------------------
Time: 1583633778000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
-------------------------------------------
Time: 1583633783000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
-------------------------------------------
Time: 1583633788000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
-------------------------------------------
Time: 1583633793000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
-------------------------------------------
Time: 1583633798000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
-------------------------------------------
Time: 1583633803000 ms
-------------------------------------------
(4,1)
(2,1)
(1,1)
(3,1)
(5,1)
-------------------------------------------
Time: 1583633808000 ms
-------------------------------------------
(4,0)
(2,0)
(1,0)
(3,0)
(5,0)
*/PreviousreduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])NextcountByValueAndWindow(windowLength, slideInterval, [numTasks])
Last updated