# updateStateByKey(func)

### updateStateByKey(func)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

To continually and cumulatively record statistics through streaming life time, in simple example, such as calculate word count from begining of streaming, previous state of RDD in the DSTREAM must be maintained and included in the computation of statistics, such as word count, that is what updateStateByKey(func) is for.

Example:

Background:

This example is textFileStream and each line in a file contains words delimited by blank space

```
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}

class wordCountClass extends Serializable {
def flatMap(lines: org.apache.spark
  .streaming.dstream.DStream[String])
    : org.apache.spark.streaming.dstream
      .DStream[String] = {
        val results=lines.flatMap(_.split(" "))
       results
      }

def map(words: org.apache.spark.streaming.dstream
  .DStream[String]): org.apache.spark.streaming
    .dstream.DStream[(String, Int)] = {
     val wordPair=words.map(x=>(x,1))
    wordPair
 }

def reduceByKey(wordPair: org.apache.spark
  .streaming.dstream.DStream[(String, Int)]): 
    org.apache.spark.streaming.dstream
      .DStream[(String, Int)]={
val wordCounts=wordPair.reduceByKey((a,b)=>(a+b))
wordCounts
}
}

val updateFunc = 
  (values: Seq[Int],state: Option[Int])=>{
    val currentCount = values.foldLeft(0)(_+_)
    val previousCount = state.getOrElse(0)
    Some(currentCount + previousCount)
   }

val sparkConf = new SparkConf()
   .setAppName("textFileStreamWordCount")
sc.stop()
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("/tmp/")
val lines = ssc.textFileStream("/tmp/filestream")
val wordCountObject = new wordCountClass()
val words=wordCountObject.flatMap(lines)
val wordMap=wordCountObject.map(words)
val wordCounts = wordCountObject
   .reduceByKey(wordMap)
val updateWordCounts=wordCounts
   .updateStateByKey(updateFunc)

//wordCounts.print()
updateWordCounts.print()
ssc.start()
ssc.awaitTermination()

/*
input file:
 cat x.txt
a a b c d
d h i j k

Output:

-------------------------------------------
Time: 1583602836000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)

-------------------------------------------
Time: 1583602838000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)

-------------------------------------------
Time: 1583602840000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
...
*/
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://george-jen.gitbook.io/data-science-and-apache-spark/updatestatebykey-func.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
