DStream Window Operations
Last updated
Last updated
// Reduce last 30 seconds of data, every 10 seconds
//val windowedWordCounts = pairs
// .reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
package com.jentekco.spark
import org.apache.spark._
import org.apache.spark.streaming._
//import org.apache.spark.streaming
.StreamingContext._
import org.apache.log4j._
object WordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream
("hdfs://10.0.0.46:9000/tmp/spark/")
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1))
.reduceByKeyAndWindow((a:Int,b:Int) =>
(a + b), Seconds(30), Seconds(10))
wordCounts.print(100)
ssc.start()
ssc.awaitTermination()
} }