Interoperating with RDD
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection-based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
1
// For implicit conversions from RDDs to DataFrames
2
import spark.implicits._
3
​
4
// Create an RDD of Person objects from a text file, convert it to a Dataframe
5
​
6
val people = sc
7
.textFile("file:///home/dv6/spark/spark/examples/src/main/resources/people.txt")
8
.map(_.split(","))
9
.map(attributes => (attributes(0),attributes(1)))
10
.toDS().withColumnRenamed("_1","name")
11
.withColumnRenamed("_2","age")
12
people.show()
13
​
14
/*
15
+-------+---+
16
| name|age|
17
+-------+---+
18
|Michael| 29|
19
| Andy| 30|
20
| Justin| 19|
21
+-------+---+
22
*/
Copied!
The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.
1
import org.apache.spark.sql.types._
2
import org.apache.spark.sql.Row
3
​
4
// Create an RDD
5
val peopleRDD = spark
6
.sparkContext.textFile("file:///home/dv6/spark/spark/examples/src/main/resources/people.txt")
7
​
8
// The schema is encoded in a string
9
val schemaString = "name age"
10
​
11
// Generate the schema based on the string of schema
12
val fields = schemaString.split(" ")
13
.map(fieldName => StructField(fieldName
14
, StringType, nullable = true))
15
val schema = StructType(fields)
16
​
17
// Convert records of the RDD (people) to Rows
18
val rowRDD = peopleRDD
19
.map(_.split(","))
20
.map(attributes => Row(attributes(0)
21
, attributes(1).trim))
22
​
23
// Apply the schema to the RDD
24
val peopleDF = spark.createDataFrame(rowRDD, schema)
25
​
26
// Creates a temporary view using the DataFrame
27
peopleDF.createOrReplaceTempView("people")
28
​
29
// SQL can be run over a temporary view created using DataFrames
30
val results = spark.sql("SELECT name FROM people")
31
​
32
// The results of SQL queries are DataFrames and support all the normal RDD operations
33
// The columns of a row in the result can be accessed by field index or by field name
34
results.map(attributes => "Name: " + attributes(0))
35
.show()
36
​
37
/*
38
+-------------+
39
| value|
40
+-------------+
41
|Name: Michael|
42
| Name: Andy|
43
| Name: Justin|
44
+-------------+
45
*/
46
​
Copied!
Last modified 1yr ago
Copy link