saveAsHadoopFiles(prefix, [suffix])
Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
Copy import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.config("spark.master", "local[2]")
.appName("streaming for book")
.getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/")
import spark.implicits._
val sc=spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val messages1 = ssc.textFileStream("/tmp/filestream1/").filter(_.nonEmpty).map(x=>(x,x))
messages1.print()
messages1.saveAsHadoopFiles
("hdfs://10.0.0.202:9000/tmp/","txt")
ssc.start()
ssc.awaitTermination()
/*
on 10.0.0.202, run below
hdfs dfs -ls /tmp
output:
...
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656122000.txt
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656123000.txt
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656124000.txt
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656125000.txt
...
*/