updateStateByKey(func)
updateStateByKey(func)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
class wordCountClass extends Serializable {
def flatMap(lines: org.apache.spark
.streaming.dstream.DStream[String])
: org.apache.spark.streaming.dstream
.DStream[String] = {
val results=lines.flatMap(_.split(" "))
results
}
def map(words: org.apache.spark.streaming.dstream
.DStream[String]): org.apache.spark.streaming
.dstream.DStream[(String, Int)] = {
val wordPair=words.map(x=>(x,1))
wordPair
}
def reduceByKey(wordPair: org.apache.spark
.streaming.dstream.DStream[(String, Int)]):
org.apache.spark.streaming.dstream
.DStream[(String, Int)]={
val wordCounts=wordPair.reduceByKey((a,b)=>(a+b))
wordCounts
}
}
val updateFunc =
(values: Seq[Int],state: Option[Int])=>{
val currentCount = values.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf()
.setAppName("textFileStreamWordCount")
sc.stop()
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("/tmp/")
val lines = ssc.textFileStream("/tmp/filestream")
val wordCountObject = new wordCountClass()
val words=wordCountObject.flatMap(lines)
val wordMap=wordCountObject.map(words)
val wordCounts = wordCountObject
.reduceByKey(wordMap)
val updateWordCounts=wordCounts
.updateStateByKey(updateFunc)
//wordCounts.print()
updateWordCounts.print()
ssc.start()
ssc.awaitTermination()
/*
input file:
cat x.txt
a a b c d
d h i j k
Output:
-------------------------------------------
Time: 1583602836000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
-------------------------------------------
Time: 1583602838000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
-------------------------------------------
Time: 1583602840000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
...
*/Last updated