reduceByKey(func, [numTasks])
reduceByKey(func, [numTasks])
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val dataDirectory="/tmp/filestream/"
val lines=ssc.textFileStream(dataDirectory)
val keyValues = lines.flatMap(_.split(" ")).filter(_.nonEmpty).map(x=>(x,1))
val keyCount=keyValues.reduceByKey((x,y)=>(x+y))
keyCount.print()
ssc.start()
ssc.awaitTermination()
/*
input file:
cat y.txt
a a b c d
d h i j k
Output:
-------------------------------------------
Time: 1583561350000 ms
-------------------------------------------
(d,2)
(b,1)
(h,1)
(j,1)
(a,2)
(i,1)
(k,1)
(c,1)
*/Last updated