Example: Enrich JSON

Enrich JSON

JSON (JavaScript Object Notation) is a lightweight data-interchange format. It is easy for humans to read and write. It is easy for machines to parse and generate.

One of the Json representation is a collection of name/value pairs. In various languages, this is realized as an object, record, struct, dictionary, hash table, keyed list, or associative arra

Json is one of the major data representations for both Human being and machine. Social media companies sends you streaming data if you request them through their API, for example, Twitter can stream you tweets in Json.

However, after all, Json is just a text string, just a text string that has to follow given syntaxt in order for the machine to parse it. We human being can read a Json even it is syntactically incorrect.

The use case I would like to demonstrate is to add information to a given Json string and produce a new json string contains additional information, i.e, to enrich a Json.

Following is record 1

{

"visitorId": "v1",

"products": [{

"id": "i1",

"interest": 0.68

}, {

"id": "i2",

"interest": 0.42

}]

}

Following is record 2:

{

"visitorId": "v2",

"products": [{

"id": "i1",

"interest": 0.78

}, {

"id": "i3",

"interest": 0.11

}] }

Following is dimension definition:

"i1” is "Nike Shoes"

"i2" is "Umbrella“

"i3" is "Jeans“

Now define enrichment task: Turn the record 1, based upon dimension defined earlier, add a “name” attribute:

{

"visitorId": "v1",

"products": [{

"id": "i1",

"interest": 0.68

}, {

"id": "i2",

"interest": 0.42

}]

}

Into

{

"visitorId": "v1",

"products": [{

"id": "i1",

“name”: “Nike Shoes”,

"interest": 0.68

}, {

"id": "i2",

“name”: “Unbrella”,

"interest": 0.42

}]

}

Also, enrich record 2 the same way:

{

"visitorId": "v2",

"products": [{

"id": "i1",

"interest": 0.78

}, {

"id": "i3",

"interest": 0.11

}]

}

Into:

{

"visitorId": "v2",

"products": [{

"id": "i1",

“name”: “Nike Shoes”,

"interest": 0.78

}, {

"id": "i3",

“name”: “Jeans”,

"interest": 0.11

}]

}

I am going to use Scala for the Json enrichment task. Why Scala? Two reasons:

Scala has wealth of Json library that can parse and extract Json string easily.

Apache Spark has its own library and methods to read and parse Json through Spark SQL.

I will use 2 approaches to accomplish the Json enrichment task defined earlier.

Approach that does not use Apache Spark

Approach that uses Apache Spark SQL

Here is the non spark approach, by using json4s library:

package com.jentekco.enrichJson

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write


object NoSpark {
  implicit val formats = org.json4s.DefaultFormats
  def main(args: Array[String]): Unit = {
      val rec1: String = """{
    "visitorId": "v1",
    "products": [{
         "id": "i1",
         "interest": 0.68
    }, {
         "id": "i2",
         "interest": 0.42
    }]
}"""
      
      val rec2: String = """{
    "visitorId": "v2",
    "products": [{
         "id": "i1",
         "interest": 0.78
    }, {
         "id": "i3",
         "interest": 0.11
    }]
}"""
      
      val visitsData: Seq[String] = Seq(rec1, rec2)
      for (i<-0 until visitsData.size)
      {
        println(visitsData(i))
        println(" ")
      }

      val productIdToNameMap = Map("i1" -> "Nike Shoes", "i2" -> "Umbrella", "i3" -> "Jeans")
      
      case class v_rec(
    id: String,
    interest: Double
    )
    case class p_rec(
        visitorId: String, products: Array[v_rec]
    )
    
 // New case class

    case class v_rec_new(
        id: String,
        name: String,
        interest: Double
      )
    case class p_rec_new(
        visitorId: String, products: Array[v_rec_new]
      )
   
   var jString: Array[String]=Array[String]() 
   var enrichedJson:Array[String]=Array[String]()
   
   for (js<-visitsData)
    {
      var jObj=parse(js)
      var eJ=jObj.extract[p_rec]
      
      var jStringJ=parse(rec1)
      for (i<-0 until eJ.products.size)
       {
           var prodName:String="Invalid Product"
           //if there is no such product, show Invalid Product
           if (productIdToNameMap contains (eJ.products(i).id.toString))                
               prodName=productIdToNameMap(eJ.products(i).id.toString)
           var newRec=p_rec_new(
           visitorId=eJ.visitorId,
           products=Array(v_rec_new(
           eJ.products(i).id.toString,
           prodName,
           eJ.products(i).interest         
           )
           )
           )   
           
//           println(newRec.visitorId, newRec.products(0).name)
           //Now Json Serilizing it
 
           val newRecStr = write(newRec)
//           println(newRecStr)
           jString:+=newRecStr
       }
//      println(jString.size)
      
//      var jStringJ:Array[JObject]=Array[JObject]()

      for (x<-0 until jString.size)
      {   
          if (x==0)
            jStringJ=parse(jString(x))
          else
          {
            jStringJ=jStringJ merge parse(jString(x))
          }
 
      }

//      println("test",jStringJ)
      enrichedJson:+=write(jStringJ)        
      jString=Array[String]()
      
      
    }  
     for (i<-enrichedJson)
        println(i)
  }  

}
© 2020 GitHub, Inc.

Running the prior Scala code will produce below output:

{"visitorId":"v1","products":[{"id":"i1","name":"Nike Shoes","interest":0.68},{"id":"i2","name":"Umbrella","interest":0.42}]}

{"visitorId":"v2","products":[{"id":"i1","name":"Nike Shoes","interest":0.78},{"id":"i3","name":"Jeans","interest":0.11}]}

Next, I will demonstrate to do the same enrichment on the same Json record and produce the same output, by using Apache Spark SQL.

package com.jentekco.enrichJsonNew

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j._


object Spark {
        def main(args: Array[String]): Unit = {
      Logger.getLogger("org").setLevel(Level.ERROR)
      val rec1: String = """{
    "visitorId": "v1",
    "products": [{
         "id": "i1",
         "interest": 0.68
    }, {
         "id": "i2",
         "interest": 0.42
    }]
}"""
      
      val rec2: String = """{
    "visitorId": "v2",
    "products": [{
         "id": "i1",
         "interest": 0.78
    }, {
         "id": "i3",
         "interest": 0.11
    }]
}"""
      
      val visitsData: Seq[String] = Seq(rec1, rec2)
      val productIdToNameMap = Map("i1" -> "Nike Shoes", "i2" -> "Umbrella", "i3" -> "Jeans")
      
      //Solution starts here
      val spark = SparkSession
    .builder
    .appName("JsonApp")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", "file:///d:/tmp")
    .getOrCreate()
      
      import spark.implicits._
      import spark.sql
      
      productIdToNameMap.toSeq.toDF("id","name").createOrReplaceTempView("prodRec")
      for (i<-visitsData)
    {
//      println(rec)
    println("Original Json String is: \n")
    println(i)
    println("\n")
    var rec=spark.read.json(Seq(i).toDS) 
    rec.createOrReplaceTempView("dfVisitsTable")
//    sql("select * from dfVisitsTable").show()
    val productsArr=sql ("SELECT products FROM dfVisitsTable").withColumn("products", explode($"products")).select("products.*")
//    productsArr.show(false)
    productsArr.createOrReplaceTempView("productsArr")
//    val enrichedProducts=sql("select a.id, b.name, a.interest from productsArr a, prodRec b where a.id=b.id")
//    enrichedProducts.show(false)
    //  Need to do outer join in case the product id in the record is not valid, if product id not found in the MAP,
//  return invalid product
    val enrichedProducts=sql("select a.id, if (b.name is not null, b.name, 'invalid product') name, a.interest from productsArr a full outer join prodRec b on a.id=b.id")     
    val enrichedRecord=rec.select("VisitorId").join(enrichedProducts)
//    enrichedRecord.show(false)
    enrichedRecord.createOrReplaceTempView("enrichedRec")
//    sql("select visitorId, collect_list(struct(id, name, interest)) products from enrichedRec group by visitorId").show(false)
    val enrichedJson=sql("select visitorId, collect_list(struct(id, name, interest)) products from enrichedRec group by visitorId").toJSON
    .collect.mkString("",",","")
    println("Enriched Json String is:\n")
    println(enrichedJson)
    println(" ")
    println(" ")
    }
     } 

 
}

Running above code will produce below output:

Enriched Json String is:

{"visitorId":"v1","products":[{"name":"Jeans"},{"id":"i1","name":"Nike Shoes","interest":0.68},{"id":"i2","name":"Umbrella","interest":0.42}]}

Original Json String is:

{

"visitorId": "v2",

"products": [{

"id": "i1",

"interest": 0.78

}, {

"id": "i3",

"interest": 0.11

}]

}

Enriched Json String is:

{"visitorId":"v2","products":[{"id":"i3","name":"Jeans","interest":0.11},{"id":"i1","name":"Nike Shoes","interest":0.78},{"name":"Umbrella"}]}

Both approaches, Scala only without Spark and Scala with Spark produce the same result. Which method would I recommend? In the production environment with large number of Json records, thinking about millions or billions of Json records to be processed, Apache Spark is a way to go.

A scala program in itself is no different from a Java program, in fact, they are the same because both will be compiled into Java byte code and run on JVM. Scala program is just a monolithic program without parallelism unless you code it that way. Writing a Scala program with Apache Spark will take advantages of Spark distributed computing framework and rich library of Spark SQL, that processes, for example, Json enrichment task in a few SQL queries executed by Apache Spark, in parallel, across Spark worker nodes.

Last updated