saveAsTextFiles(prefix, [suffix])

saveAsTextFiles(prefix, [suffix])

Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()

spark.sparkContext.setCheckpointDir("/tmp/")

import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))


val messages1 = ssc.textFileStream("/tmp/filestream1/")
val messages2 = ssc.textFileStream("/tmp/filestream2/")
val messages11=messages1.filter(_.nonEmpty).map(x=>(x,x))
val messages22=messages2.filter(_.nonEmpty).map(x=>(x,x))
val messages4=messages11.join(messages22)

messages4.print()
messages4.saveAsTextFiles("/tmp/stream-TIME_IN_MS.txt")

ssc.start()
ssc.awaitTermination()

/*

in /tmp/

drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650696000
drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650697000
drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650698000
drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650699000
drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650700000
drwxrwxr-x   2 dv6  dv6   4096 Mar  7 22:58 stream-TIME_IN_MS.txt-1583650701000
...

*/

Last updated