saveAsTextFiles(prefix, [suffix])

saveAsTextFiles(prefix, [suffix])

Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Example:
1
import org.apache.spark._
2
import org.apache.spark.SparkContext._
3
import org.apache.spark.streaming._
4
import org.apache.spark.streaming.StreamingContext._
5
import org.apache.log4j.{Level, Logger}
6
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
7
import org.apache.spark.sql.SQLContext
8
​
9
Logger.getLogger("org").setLevel(Level.ERROR)
10
val spark = SparkSession
11
.builder()
12
.config("spark.master", "local[2]")
13
.appName("streaming for book")
14
.getOrCreate()
15
​
16
spark.sparkContext.setCheckpointDir("/tmp/")
17
​
18
import spark.implicits._
19
val sc=spark.sparkContext
20
val ssc = new StreamingContext(sc, Seconds(1))
21
​
22
​
23
val messages1 = ssc.textFileStream("/tmp/filestream1/")
24
val messages2 = ssc.textFileStream("/tmp/filestream2/")
25
val messages11=messages1.filter(_.nonEmpty).map(x=>(x,x))
26
val messages22=messages2.filter(_.nonEmpty).map(x=>(x,x))
27
val messages4=messages11.join(messages22)
28
​
29
messages4.print()
30
messages4.saveAsTextFiles("/tmp/stream-TIME_IN_MS.txt")
31
​
32
ssc.start()
33
ssc.awaitTermination()
34
​
35
/*
36
​
37
in /tmp/
38
​
39
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650696000
40
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650697000
41
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650698000
42
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650699000
43
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650700000
44
drwxrwxr-x 2 dv6 dv6 4096 Mar 7 22:58 stream-TIME_IN_MS.txt-1583650701000
45
...
46
​
47
*/
48
​
Copied!
Last modified 1yr ago
Copy link