SPARK Streaming

SPARK Streaming

Spark Streaming can read data from HDFS, Flume, Kafka, Twitter and ZeroMQ. You can also define your own custom data sources.

Spark Streaming runs on Spark's standalone cluster mode or other supported cluster resource managers.

It also includes a local run mode for development. In production, Spark Streaming uses ZooKeeper and HDFS for high availability.

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

A simple streaming test:

On a linux or mac machine that supports ncat:

Run below at the OS command line

nc -lk 29999

Then enter something like below:

test test again here I am
test test test again again

Assume in parallel, you have below code ready to run:

//following is the scala code:
package com.jentekco.spark
import org.apache.spark._
import org.apache.spark.streaming._
//import org.apache.spark.streaming
//   .StreamingContext._ 
import org.apache.log4j._
object streaming1 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a local StreamingContext 
// with two working thread and batch interval 
// of 1 second.
// The master requires 2 cores to prevent a 
// starvation scenario.
val conf = new SparkConf().setMaster("local[2]")
  .setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to 
// hostname:port, like localhost:9999
val lines = ssc.socketTextStream("10.0.0.46", 29999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming
//  .StreamingContext._ 
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey((a,b)=>a+b)
// Print the first ten elements of each 
// RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() 
// Wait for the computation to terminate
} }

or the Python code that has already started:

import findspark
findspark.init()
import pyspark
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
#Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
#Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("10.0.0.46", 29999)
words = lines.flatMap(lambda _:_.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x,y: x+y)
#Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

I have created detailed video presentation on Spark Streaming, the links to the video presentation are in Appendix

Last updated