Data Visualization with Vegas Viz and Scala with Spark ML



PreviousApache Spark SQL & Machine Learning on Genetic Variant ClassificationsNextApache Spark Machine Learning with Dremio Data Lake Engine
Last updated



Last updated
//start with pulling Vegas-viz library jars from Maven repository, your machines needs to connect to the
// internet
import $ivy.`org.vegas-viz::vegas:0.3.11`
import $ivy.`org.vegas-viz::vegas-spark:0.3.11`
//You will see lots of downloads from running the above 2 lines
//Once done, you can import Vegas library for plotting
import vegas._
import vegas.render.WindowRenderer._
//Almond/Jupyter-scala does not integrated with Spark, so you will need to integrate it manually
//by download necessary jars from Maven, for the Spark libraries need to accomplish this demo
import $ivy.`org.jupyter-scala::spark:0.4.2`
import $ivy.`org.apache.spark::spark-sql:2.4.4`
//Again lots of downloads from above 2 lines, once download is done, you are ready to import Spark libs
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import org.apache.spark.sql._
//Create Spark session
Logger.getLogger(“org”).setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName(“Vegas”)
.master(“local[*]”)
.config(“spark.sql.warehouse.dir”, “file:///tmp”)
.getOrCreate()
//and SparkContext sc
val sc = spark.sparkContext
//Even a simple LinearRegression, it is from Spark ML library, and Jupyter-Scala does not have it.
//Then download from Maven for Spark 2.4.4
import $ivy.`org.apache.spark::spark-mllib:2.4.4`
//now import Vegas-viz library
import vegas._
import vegas.data.External._
//Generate random dataset
import scala.math._
var X=Array[Double]()
var Y=Array[Double]()
//Create 100 random pairs of data points in X and Y Array with Double datatype, ranging from 0 to 10
for (i<-0 until 100)
{
var x=random*10
var b=random*10
X:+=x
Y:+=3*x+b
}
//You will notice the dataset is random, how Y=3X+b, it will fit nicely with LinearRegression
//That is not the point, the point is to play with Vegas-viz data visualization
//Create features, label Spark dataframe that sorted by features
import spark.implicits._
val df = sc.parallelize(X zip Y).toDF(“features”,”label”).sort(“features”)
df.show(3,false)
+ — — — — — — — — — -+ — — — — — — — — — +
|features |label |
+ — — — — — — — — — -+ — — — — — — — — — +
|0.12428231703539572|2.6330542403970045|
|0.12875124825893702|9.111634435646987 |
|0.21021408668144725|10.037147132043941|
+ — — — — — — — — — -+ — — — — — — — — — +
only showing top 3 rows
//Import Vegas plotting libraries
import vegas._
import vegas.render.WindowRenderer._
import vegas.sparkExt._
//Plot original Spark dataframe to see how these 100 data points visually
val plot = Vegas(“linear regression”,width=800, height=600).
withDataFrame(df).
mark(Point).
encodeX(“features”, Quantitative).
encodeY(“label”, Quantitative).
mark(Point).
show//LinearRegression is to find the best fit straight line in the form of
// y_pred=coeffient*X+Intercept
//In simple term, coeffient is weight, intercept is bias
// Below is to call Apache Spark ML LinearRegression API to finish the job
import org.apache.spark.ml.feature.VectorAssembler
//Vectorize feature columns into “feature_new”
val vectorAssembler = new VectorAssembler()
.setInputCols(Array(“features”))
.setOutputCol(“features_new”)
//Create a dataframe that feature column in vector and sorted, that is required by Spark ML API
var vector_df = vectorAssembler.transform(df)
vector_df = vector_df.select(“features_new”, “label”).sort(“features_new”)
vector_df.show(3,false)
/*
output:
+---------------------+------------------+
|features_new |label |
+---------------------+------------------+
|[0.12428231703539572]|2.6330542403970045|
|[0.12875124825893702]|9.111634435646987 |
|[0.21021408668144725]|10.037147132043941|
+---------------------+------------------+
only showing top 3 rows
*/
//Split 100 datapoints to 70 for training and 30 for testing
val splits = vector_df.randomSplit(Array(0.7,0.3))
val train_df = splits(0)
val test_df = splits(1)
print(test_df.count())
//Create LinearRegression model with train_df
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression()
.setFeaturesCol(“features_new”)
.setLabelCol(“label”)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setMaxIter(10)
val lr_model = lr.fit(train_df)
// Then test the trained model with test_df
val lr_predictions = lr_model.transform(test_df)
//Then display testing results
lr_predictions.select(“prediction”,”label”,”features_new”).sort(“features_new”).show(3,false)
/*
Output:
+-----------------+------------------+---------------------+
|prediction |label |features_new |
+-----------------+------------------+---------------------+
|6.184323007474771|9.111634435646987 |[0.12875124825893702]|
|6.877166443408621|6.914045632096169 |[0.3713831792261324] |
|8.485151210258538|7.8911343006901555|[0.9344951735265017] |
+-----------------+------------------+---------------------+
only showing top 3 rows
*/
//evaluate the training quality, show R square score
import org.apache.spark.ml.evaluation.RegressionEvaluator
val lr_evaluator = new RegressionEvaluator().setPredictionCol(“prediction”).
setLabelCol(“label”).setMetricName(“r2”)
print(“R Squared (R2) on test data =” + lr_evaluator.evaluate(lr_predictions))
/*
Output:
R Squared (R2) on test data =0.9248598537955983
*/
//Now, based on 100 original data points, to generate 100 predicted value from weight and bias
// that comes from trained model.
// weight is coeffient, which is lr_model.coefficients(0)
// bias is intercept, which is lr_model.intercept
// Best fit linear equation is
// y_pred= weight*x+bias
// which is
//y_pred= lr_model.coefficients(0)*x+ lr_model.intercept
// now generate 100 y_pred from 100 original X based on
// y_pred= lr_model.coefficients(0)*x+ lr_model.intercept
var y_pred=Array[Double]()
for (i<-0 until X.length)
{
y_pred:+=(lr_model.coefficients(0))*X(i)+lr_model.intercept
}
//Create df_pref dataframe, sorted from Array X zip y_pred
val df_pred = sc.parallelize(X zip y_pred).toDF(“features”,”prediction”).sort(“features”)
// Plot it out together with original 100 datapoints
Vegas.layered(“linear regression”,width=800, height=600).
withLayers(
Layer().
withDataFrame(df).
mark(Point).
encodeX(“features”, Quantitative).
encodeY(“label”, Quantitative),
Layer().
withDataFrame(df_pred).
mark(Line).
encodeX(“features”, Quantitative).
encodeY(“prediction”, Quantitative)
).show