Hive Integration, run SQL or HiveQL queries on existing warehouses.

Hive Integration, run SQL or HiveQL queries on existing warehouses.

Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses.
1
package com.jentekco.spark
2
​
3
​
4
import java.io.File
5
import org.apache.log4j._
6
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
7
​
8
​
9
object SparkHive {
10
​
11
12
case class Record(key: Int, value: String)
13
14
​
15
def main(args: Array[String]): Unit = {
16
17
Logger.getLogger("org").setLevel(Level.ERROR)
18
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
19
​
20
val spark = SparkSession
21
.builder()
22
.config("spark.master", "local")
23
.appName("interfacing spark sql to hive metastore with no configuration file")
24
.config("hive.metastore.uris", "thrift://10.0.0.46:9083") // replace with your hivemetastore service's thrift url
25
.enableHiveSupport() // to enable hive support
26
.getOrCreate()
27
​
28
​
29
import spark.implicits._
30
import spark.sql
31
​
32
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
33
sql("LOAD DATA LOCAL INPATH 'D:/spark/examples/src/main/resources/kv1.txt' INTO TABLE src")
34
sql("SELECT * FROM src").show()
35
sql("SELECT COUNT(*) FROM src").show()
36
37
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
38
val stringsDS = sqlDF.map {
39
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
40
}
41
stringsDS.show()
42
​
43
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
44
recordsDF.createOrReplaceTempView("records")
45
​
46
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
47
sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")
48
​
49
val df = spark.table("src")
50
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
51
​
52
sql("SELECT * FROM hive_records").show()
53
​
54
val dataDir = "/tmp/parquet_data"
55
spark.range(10).write.parquet(dataDir)
56
​
57
sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
58
​
59
sql("SELECT * FROM hive_bigints").show()
60
​
61
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
62
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
63
​
64
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
65
66
sql("SELECT * FROM hive_part_tbl").show()
67
​
68
​
69
spark.stop()
70
71
}
72
}
Copied!
Scala and python code is available on my github site:
jentekllc/Spark/Scala/SQL at master Β· geyungjen/jentekllc
GitHub
Last modified 1yr ago
Copy link