SPARK Streaming

SPARK Streaming

Spark Streaming can read data from HDFS, Flume, Kafka, Twitter and ZeroMQ. You can also define your own custom data sources.
Spark Streaming runs on Spark's standalone cluster mode or other supported cluster resource managers.
It also includes a local run mode for development. In production, Spark Streaming uses ZooKeeper and HDFS for high availability.
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

A simple streaming test:

On a linux or mac machine that supports ncat:
Run below at the OS command line
1
nc -lk 29999
Copied!
Then enter something like below:
1
test test again here I am
2
test test test again again
Copied!
Assume in parallel, you have below code ready to run:
1
//following is the scala code:
2
package com.jentekco.spark
3
import org.apache.spark._
4
import org.apache.spark.streaming._
5
//import org.apache.spark.streaming
6
// .StreamingContext._
7
import org.apache.log4j._
8
object streaming1 {
9
def main(args: Array[String]): Unit = {
10
Logger.getLogger("org").setLevel(Level.ERROR)
11
// Create a local StreamingContext
12
// with two working thread and batch interval
13
// of 1 second.
14
// The master requires 2 cores to prevent a
15
// starvation scenario.
16
val conf = new SparkConf().setMaster("local[2]")
17
.setAppName("NetworkWordCount")
18
val ssc = new StreamingContext(conf, Seconds(1))
19
// Create a DStream that will connect to
20
// hostname:port, like localhost:9999
21
val lines = ssc.socketTextStream("10.0.0.46", 29999)
22
// Split each line into words
23
val words = lines.flatMap(_.split(" "))
24
//import org.apache.spark.streaming
25
// .StreamingContext._
26
// Count each word in each batch
27
val pairs = words.map(word => (word, 1))
28
val wordCounts = pairs.reduceByKey((a,b)=>a+b)
29
// Print the first ten elements of each
30
// RDD generated in this DStream to the console
31
wordCounts.print()
32
ssc.start() // Start the computation
33
ssc.awaitTermination()
34
// Wait for the computation to terminate
35
} }
Copied!
or the Python code that has already started:
1
import findspark
2
findspark.init()
3
import pyspark
4
from pyspark import SparkConf,SparkContext
5
from pyspark.streaming import StreamingContext
6
from pyspark.sql import Row,SQLContext
7
import sys
8
import requests
9
#Create a local StreamingContext with two working thread and batch interval of 1 second
10
sc = SparkContext("local[2]", "NetworkWordCount")
11
ssc = StreamingContext(sc, 1)
12
#Create a DStream that will connect to hostname:port, like localhost:9999
13
lines = ssc.socketTextStream("10.0.0.46", 29999)
14
words = lines.flatMap(lambda _:_.split(" "))
15
pairs = words.map(lambda word: (word, 1))
16
wordCounts = pairs.reduceByKey(lambda x,y: x+y)
17
#Print the first ten elements of each RDD generated in this DStream to the console
18
wordCounts.pprint()
19
ssc.start() # Start the computation
20
ssc.awaitTermination() # Wait for the computation to terminate
Copied!
I have created detailed video presentation on Spark Streaming, the links to the video presentation are in Appendix​
​
Last modified 1yr ago
Copy link