Spark SQL Implementation Example in Scala

Spark SQL Implementation Example in Scala

1
package com.jentekco.spark
2
//George Jen, Jen Tek LLC
3
import org.apache.spark._
4
import org.apache.spark.SparkContext._
5
import org.apache.spark.rdd._
6
import org.apache.spark.util.LongAccumulator
7
import org.apache.log4j._
8
import scala.collection.mutable.ArrayBuffer
9
import org.apache.spark.sql._
10
​
11
object ConvertCSV2Parquet {
12
13
def main(args: Array[String]) {
14
Logger.getLogger("org").setLevel(Level.ERROR)
15
val spark = SparkSession
16
.builder
17
.appName("csv2parquet")
18
.master("local[*]")
19
.config("spark.sql.warehouse.dir", "file:///d:/tmp")
20
.getOrCreate()
21
22
​
23
val ds = spark.read.format("csv").option("header", "true").option("quote", "\"").load("D:/teaching/scala/ticker_symbol.csv")
24
val df: DataFrame = ds.toDF()
25
​
26
​
27
df.show(3, false)
28
​
29
//When the CSV file was read into DataFrame, all fields are String, below is to cast it to
30
//what the data should be, such as cast CategoryNumber to Int
31
​
32
val df_with_datatype=df.selectExpr("Ticker",
33
"Name",
34
"Exchange",
35
"CategoryName",
36
"cast(CategoryNumber as int) CategoryNumber")
37
​
38
df_with_datatype.show(3, false)
39
​
40
//Save the DataFrame to Parquet format, overwrite if existing.
41
//Parquet is Columnar, good for Analytics query.
42
​
43
df_with_datatype.write.mode(SaveMode.Overwrite).parquet("D:/teaching/scala/ticker_symbol.parquet")
44
​
45
//Read the Parquet data back and run SQL query on it
46
​
47
val read_parquet_df = spark.read.parquet("D:/teaching/scala/ticker_symbol.parquet")
48
​
49
read_parquet_df.show(3, false)
50
​
51
import spark.implicits._
52
val TickerSymbol = read_parquet_df.toDF()
53
54
TickerSymbol.printSchema()
55
56
TickerSymbol.createOrReplaceTempView("TickerSymbol")
57
58
spark.sql("SELECT * from TickerSymbol where Ticker in ('IBM','MSFT','HPQ','GE')").show(20,false)
59
​
60
}
61
}
Copied!
Last modified 1yr ago
Copy link