saveAsHadoopFiles(prefix, [suffix])

saveAsHadoopFiles(prefix, [suffix])

Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

Python API This is not available in the Python API.

Example:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext

import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

Logger.getLogger("org").setLevel(Level.ERROR)  
val spark = SparkSession
          .builder()
          .config("spark.master", "local[2]")
          .appName("streaming for book")
          .getOrCreate()

spark.sparkContext.setCheckpointDir("/tmp/")

import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))


val messages1 = ssc.textFileStream("/tmp/filestream1/").filter(_.nonEmpty).map(x=>(x,x))

messages1.print()
messages1.saveAsHadoopFiles
  ("hdfs://10.0.0.202:9000/tmp/","txt")
ssc.start()
ssc.awaitTermination()

/*

on 10.0.0.202, run below

hdfs dfs -ls /tmp

output:

...
drwxr-xr-x   - dv6 supergroup          0 2020-03-08 00:28 /tmp/-1583656122000.txt
drwxr-xr-x   - dv6 supergroup          0 2020-03-08 00:28 /tmp/-1583656123000.txt
drwxr-xr-x   - dv6 supergroup          0 2020-03-08 00:28 /tmp/-1583656124000.txt
drwxr-xr-x   - dv6 supergroup          0 2020-03-08 00:28 /tmp/-1583656125000.txt
...

*/

Last updated