Spark SQL Implementation Example in Scala

Spark SQL Implementation Example in Scala

package com.jentekco.spark
//George Jen, Jen Tek LLC
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._

object ConvertCSV2Parquet {
  
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
    .builder
    .appName("csv2parquet")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", "file:///d:/tmp")
    .getOrCreate()
    

val ds = spark.read.format("csv").option("header", "true").option("quote", "\"").load("D:/teaching/scala/ticker_symbol.csv")
val df: DataFrame = ds.toDF()


df.show(3, false)

//When the CSV file was read into DataFrame, all fields are String, below is to cast it to
//what the data should be, such as cast CategoryNumber to Int

val df_with_datatype=df.selectExpr("Ticker",
                  "Name", 
                  "Exchange",
                  "CategoryName",
                  "cast(CategoryNumber as int) CategoryNumber")

df_with_datatype.show(3, false)

//Save the DataFrame to Parquet format, overwrite if existing.
//Parquet is Columnar, good for Analytics query.

df_with_datatype.write.mode(SaveMode.Overwrite).parquet("D:/teaching/scala/ticker_symbol.parquet")

//Read the Parquet data back and run SQL query on it

val read_parquet_df = spark.read.parquet("D:/teaching/scala/ticker_symbol.parquet")

read_parquet_df.show(3, false)

import spark.implicits._
    val TickerSymbol = read_parquet_df.toDF()
    
    TickerSymbol.printSchema()
    
    TickerSymbol.createOrReplaceTempView("TickerSymbol")
    
    spark.sql("SELECT * from TickerSymbol where Ticker in ('IBM','MSFT','HPQ','GE')").show(20,false)

}
}

Last updated