saveAsHadoopFiles(prefix, [suffix])

saveAsHadoopFiles(prefix, [suffix])

Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
7
import org.apache.spark.sql.SQLContext
8
​
9
import org.apache.hadoop.mapred.TextOutputFormat
10
import org.apache.hadoop.io.Text
11
import org.apache.hadoop.io.IntWritable
12
​
13
Logger.getLogger("org").setLevel(Level.ERROR)
14
val spark = SparkSession
15
.builder()
16
.config("spark.master", "local[2]")
17
.appName("streaming for book")
18
.getOrCreate()
19
​
20
spark.sparkContext.setCheckpointDir("/tmp/")
21
​
22
import spark.implicits._
23
val sc=spark.sparkContext
24
val ssc = new StreamingContext(sc, Seconds(1))
25
​
26
​
27
val messages1 = ssc.textFileStream("/tmp/filestream1/").filter(_.nonEmpty).map(x=>(x,x))
28
​
29
messages1.print()
30
messages1.saveAsHadoopFiles
31
("hdfs://10.0.0.202:9000/tmp/","txt")
32
ssc.start()
33
ssc.awaitTermination()
34
​
35
/*
36
​
37
on 10.0.0.202, run below
38
​
39
hdfs dfs -ls /tmp
40
​
41
output:
42
​
43
...
44
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656122000.txt
45
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656123000.txt
46
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656124000.txt
47
drwxr-xr-x - dv6 supergroup 0 2020-03-08 00:28 /tmp/-1583656125000.txt
48
...
49
​
50
*/
51
​
Copied!
Last modified 1yr ago
Copy link