Example: Enrich JSON

Enrich JSON

JSON (JavaScript Object Notation) is a lightweight data-interchange format. It is easy for humans to read and write. It is easy for machines to parse and generate.
One of the Json representation is a collection of name/value pairs. In various languages, this is realized as an object, record, struct, dictionary, hash table, keyed list, or associative arra
Json is one of the major data representations for both Human being and machine. Social media companies sends you streaming data if you request them through their API, for example, Twitter can stream you tweets in Json.
However, after all, Json is just a text string, just a text string that has to follow given syntaxt in order for the machine to parse it. We human being can read a Json even it is syntactically incorrect.
The use case I would like to demonstrate is to add information to a given Json string and produce a new json string contains additional information, i.e, to enrich a Json.
Following is record 1
{
"visitorId": "v1",
"products": [{
"id": "i1",
"interest": 0.68
}, {
"id": "i2",
"interest": 0.42
}]
}
Following is record 2:
{
"visitorId": "v2",
"products": [{
"id": "i1",
"interest": 0.78
}, {
"id": "i3",
"interest": 0.11
}] }
Following is dimension definition:
"i1” is "Nike Shoes"
"i2" is "Umbrella“
"i3" is "Jeans“
Now define enrichment task: Turn the record 1, based upon dimension defined earlier, add a “name” attribute:
{
"visitorId": "v1",
"products": [{
"id": "i1",
"interest": 0.68
}, {
"id": "i2",
"interest": 0.42
}]
}
Into
{
"visitorId": "v1",
"products": [{
"id": "i1",
“name”: “Nike Shoes”,
"interest": 0.68
}, {
"id": "i2",
“name”: “Unbrella”,
"interest": 0.42
}]
}
Also, enrich record 2 the same way:
{
"visitorId": "v2",
"products": [{
"id": "i1",
"interest": 0.78
}, {
"id": "i3",
"interest": 0.11
}]
}
Into:
{
"visitorId": "v2",
"products": [{
"id": "i1",
“name”: “Nike Shoes”,
"interest": 0.78
}, {
"id": "i3",
“name”: “Jeans”,
"interest": 0.11
}]
}
I am going to use Scala for the Json enrichment task. Why Scala? Two reasons:
Scala has wealth of Json library that can parse and extract Json string easily.
Apache Spark has its own library and methods to read and parse Json through Spark SQL.
I will use 2 approaches to accomplish the Json enrichment task defined earlier.
Approach that does not use Apache Spark
Approach that uses Apache Spark SQL
Here is the non spark approach, by using json4s library:
1
package com.jentekco.enrichJson
2
3
import org.json4s._
4
import org.json4s.jackson.JsonMethods._
5
import org.json4s.DefaultFormats
6
import org.json4s.jackson.Serialization
7
import org.json4s.jackson.Serialization.write
8
9
10
object NoSpark {
11
implicit val formats = org.json4s.DefaultFormats
12
def main(args: Array[String]): Unit = {
13
val rec1: String = """{
14
"visitorId": "v1",
15
"products": [{
16
"id": "i1",
17
"interest": 0.68
18
}, {
19
"id": "i2",
20
"interest": 0.42
21
}]
22
}"""
23
24
val rec2: String = """{
25
"visitorId": "v2",
26
"products": [{
27
"id": "i1",
28
"interest": 0.78
29
}, {
30
"id": "i3",
31
"interest": 0.11
32
}]
33
}"""
34
35
val visitsData: Seq[String] = Seq(rec1, rec2)
36
for (i<-0 until visitsData.size)
37
{
38
println(visitsData(i))
39
println(" ")
40
}
41
42
val productIdToNameMap = Map("i1" -> "Nike Shoes", "i2" -> "Umbrella", "i3" -> "Jeans")
43
44
case class v_rec(
45
id: String,
46
interest: Double
47
)
48
case class p_rec(
49
visitorId: String, products: Array[v_rec]
50
)
51
52
// New case class
53
54
case class v_rec_new(
55
id: String,
56
name: String,
57
interest: Double
58
)
59
case class p_rec_new(
60
visitorId: String, products: Array[v_rec_new]
61
)
62
63
var jString: Array[String]=Array[String]()
64
var enrichedJson:Array[String]=Array[String]()
65
66
for (js<-visitsData)
67
{
68
var jObj=parse(js)
69
var eJ=jObj.extract[p_rec]
70
71
var jStringJ=parse(rec1)
72
for (i<-0 until eJ.products.size)
73
{
74
var prodName:String="Invalid Product"
75
//if there is no such product, show Invalid Product
76
if (productIdToNameMap contains (eJ.products(i).id.toString))
77
prodName=productIdToNameMap(eJ.products(i).id.toString)
78
var newRec=p_rec_new(
79
visitorId=eJ.visitorId,
80
products=Array(v_rec_new(
81
eJ.products(i).id.toString,
82
prodName,
83
eJ.products(i).interest
84
)
85
)
86
)
87
88
// println(newRec.visitorId, newRec.products(0).name)
89
//Now Json Serilizing it
90
91
val newRecStr = write(newRec)
92
// println(newRecStr)
93
jString:+=newRecStr
94
}
95
// println(jString.size)
96
97
// var jStringJ:Array[JObject]=Array[JObject]()
98
99
for (x<-0 until jString.size)
100
{
101
if (x==0)
102
jStringJ=parse(jString(x))
103
else
104
{
105
jStringJ=jStringJ merge parse(jString(x))
106
}
107
108
}
109
110
// println("test",jStringJ)
111
enrichedJson:+=write(jStringJ)
112
jString=Array[String]()
113
114
115
}
116
for (i<-enrichedJson)
117
println(i)
118
}
119
120
}
121
© 2020 GitHub, Inc.
Copied!
Running the prior Scala code will produce below output:
{"visitorId":"v1","products":[{"id":"i1","name":"Nike Shoes","interest":0.68},{"id":"i2","name":"Umbrella","interest":0.42}]}
{"visitorId":"v2","products":[{"id":"i1","name":"Nike Shoes","interest":0.78},{"id":"i3","name":"Jeans","interest":0.11}]}
Next, I will demonstrate to do the same enrichment on the same Json record and produce the same output, by using Apache Spark SQL.
1
package com.jentekco.enrichJsonNew
2
3
import org.apache.spark._
4
import org.apache.spark.SparkContext._
5
import org.apache.spark.rdd._
6
import org.apache.spark.util.LongAccumulator
7
import org.apache.log4j._
8
import scala.collection.mutable.ArrayBuffer
9
import org.apache.spark.sql._
10
import org.apache.spark.sql.functions._
11
import org.apache.log4j._
12
13
14
object Spark {
15
def main(args: Array[String]): Unit = {
16
Logger.getLogger("org").setLevel(Level.ERROR)
17
val rec1: String = """{
18
"visitorId": "v1",
19
"products": [{
20
"id": "i1",
21
"interest": 0.68
22
}, {
23
"id": "i2",
24
"interest": 0.42
25
}]
26
}"""
27
28
val rec2: String = """{
29
"visitorId": "v2",
30
"products": [{
31
"id": "i1",
32
"interest": 0.78
33
}, {
34
"id": "i3",
35
"interest": 0.11
36
}]
37
}"""
38
39
val visitsData: Seq[String] = Seq(rec1, rec2)
40
val productIdToNameMap = Map("i1" -> "Nike Shoes", "i2" -> "Umbrella", "i3" -> "Jeans")
41
42
//Solution starts here
43
val spark = SparkSession
44
.builder
45
.appName("JsonApp")
46
.master("local[*]")
47
.config("spark.sql.warehouse.dir", "file:///d:/tmp")
48
.getOrCreate()
49
50
import spark.implicits._
51
import spark.sql
52
53
productIdToNameMap.toSeq.toDF("id","name").createOrReplaceTempView("prodRec")
54
for (i<-visitsData)
55
{
56
// println(rec)
57
println("Original Json String is: \n")
58
println(i)
59
println("\n")
60
var rec=spark.read.json(Seq(i).toDS)
61
rec.createOrReplaceTempView("dfVisitsTable")
62
// sql("select * from dfVisitsTable").show()
63
val productsArr=sql ("SELECT products FROM dfVisitsTable").withColumn("products", explode(quot;products")).select("products.*")
64
// productsArr.show(false)
65
productsArr.createOrReplaceTempView("productsArr")
66
// val enrichedProducts=sql("select a.id, b.name, a.interest from productsArr a, prodRec b where a.id=b.id")
67
// enrichedProducts.show(false)
68
// Need to do outer join in case the product id in the record is not valid, if product id not found in the MAP,
69
// return invalid product
70
val enrichedProducts=sql("select a.id, if (b.name is not null, b.name, 'invalid product') name, a.interest from productsArr a full outer join prodRec b on a.id=b.id")
71
val enrichedRecord=rec.select("VisitorId").join(enrichedProducts)
72
// enrichedRecord.show(false)
73
enrichedRecord.createOrReplaceTempView("enrichedRec")
74
// sql("select visitorId, collect_list(struct(id, name, interest)) products from enrichedRec group by visitorId").show(false)
75
val enrichedJson=sql("select visitorId, collect_list(struct(id, name, interest)) products from enrichedRec group by visitorId").toJSON
76
.collect.mkString("",",","")
77
println("Enriched Json String is:\n")
78
println(enrichedJson)
79
println(" ")
80
println(" ")
81
}
82
}
83
84
85
}
Copied!
Running above code will produce below output:
Enriched Json String is:
{"visitorId":"v1","products":[{"name":"Jeans"},{"id":"i1","name":"Nike Shoes","interest":0.68},{"id":"i2","name":"Umbrella","interest":0.42}]}
Original Json String is:
{
"visitorId": "v2",
"products": [{
"id": "i1",
"interest": 0.78
}, {
"id": "i3",
"interest": 0.11
}]
}
Enriched Json String is:
{"visitorId":"v2","products":[{"id":"i3","name":"Jeans","interest":0.11},{"id":"i1","name":"Nike Shoes","interest":0.78},{"name":"Umbrella"}]}
Both approaches, Scala only without Spark and Scala with Spark produce the same result. Which method would I recommend? In the production environment with large number of Json records, thinking about millions or billions of Json records to be processed, Apache Spark is a way to go.
A scala program in itself is no different from a Java program, in fact, they are the same because both will be compiled into Java byte code and run on JVM. Scala program is just a monolithic program without parallelism unless you code it that way. Writing a Scala program with Apache Spark will take advantages of Spark distributed computing framework and rich library of Spark SQL, that processes, for example, Json enrichment task in a few SQL queries executed by Apache Spark, in parallel, across Spark worker nodes.
Last modified 1yr ago
Copy link