window(windowLength, slideInterval)
window(windowLength, slideInterval)
Return a new DStream which is computed based on windowed batches of the source DStream.
Example:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/")
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val messages1 = ssc.textFileStream("/tmp/filestream1/")
val messages2 = ssc.textFileStream("/tmp/filestream2/")
val messages3 = messages1.union(messages2)
val messages=messages3.filter(_.nonEmpty)
val windowMsg=messages.window(Seconds(10), Seconds(5))
windowMsg.print()
ssc.start()
ssc.awaitTermination()
/* Input:
cat cccc.txt
1
2
3
4
5
cat dddd.txt
1
2
3
4
5
cat eeee.txt
1
2
3
4
5
cp cccc.txt /tmp/filestream1;cp dddd.txt /tmp/filestream2
wait a while
cp eeee.txt /tmp/filestream1
Output:
-------------------------------------------
Time: 1583635894000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635899000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635904000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635909000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635914000 ms
-------------------------------------------
1
2
3
4
5
1
2
3
4
5
-------------------------------------------
Time: 1583635919000 ms
-------------------------------------------
1
2
3
4
5
1
2
3
4
5
-------------------------------------------
Time: 1583635924000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635929000 ms
-------------------------------------------
-------------------------------------------
Time: 1583635934000 ms
-------------------------------------------
1
2
3
4
5
-------------------------------------------
Time: 1583635939000 ms
-------------------------------------------
1
2
3
4
5
*/
Last updated