updateStateByKey(func)

updateStateByKey(func)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

To continually and cumulatively record statistics through streaming life time, in simple example, such as calculate word count from begining of streaming, previous state of RDD in the DSTREAM must be maintained and included in the computation of statistics, such as word count, that is what updateStateByKey(func) is for.

Example:

Background:

This example is textFileStream and each line in a file contains words delimited by blank space

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}

class wordCountClass extends Serializable {
def flatMap(lines: org.apache.spark
  .streaming.dstream.DStream[String])
    : org.apache.spark.streaming.dstream
      .DStream[String] = {
        val results=lines.flatMap(_.split(" "))
       results
      }

def map(words: org.apache.spark.streaming.dstream
  .DStream[String]): org.apache.spark.streaming
    .dstream.DStream[(String, Int)] = {
     val wordPair=words.map(x=>(x,1))
    wordPair
 }

def reduceByKey(wordPair: org.apache.spark
  .streaming.dstream.DStream[(String, Int)]): 
    org.apache.spark.streaming.dstream
      .DStream[(String, Int)]={
val wordCounts=wordPair.reduceByKey((a,b)=>(a+b))
wordCounts
}
}

val updateFunc = 
  (values: Seq[Int],state: Option[Int])=>{
    val currentCount = values.foldLeft(0)(_+_)
    val previousCount = state.getOrElse(0)
    Some(currentCount + previousCount)
   }

val sparkConf = new SparkConf()
   .setAppName("textFileStreamWordCount")
sc.stop()
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("/tmp/")
val lines = ssc.textFileStream("/tmp/filestream")
val wordCountObject = new wordCountClass()
val words=wordCountObject.flatMap(lines)
val wordMap=wordCountObject.map(words)
val wordCounts = wordCountObject
   .reduceByKey(wordMap)
val updateWordCounts=wordCounts
   .updateStateByKey(updateFunc)

//wordCounts.print()
updateWordCounts.print()
ssc.start()
ssc.awaitTermination()

/*
input file:
 cat x.txt
a a b c d
d h i j k

Output:

-------------------------------------------
Time: 1583602836000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)

-------------------------------------------
Time: 1583602838000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)

-------------------------------------------
Time: 1583602840000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
...
*/

Last updated