📔
Data Science with Apache Spark
  • Preface
  • Contents
  • Basic Prerequisite Skills
  • Computer needed for this course
  • Spark Environment Setup
  • Dev environment setup, task list
  • JDK setup
  • Download and install Anaconda Python and create virtual environment with Python 3.6
  • Download and install Spark
  • Eclipse, the Scala IDE
  • Install findspark, add spylon-kernel for scala
  • ssh and scp client
  • Summary
  • Development environment on MacOS
  • Production Spark Environment Setup
  • VirtualBox VM
  • VirtualBox only shows 32bit on AMD CPU
  • Configure VirtualBox NAT as Network Adapter on Guest VM and Allow putty ssh Through Port Forwarding
  • Docker deployment of Spark Cluster
  • Create customized Apache Spark Docker container
  • Dockerfile
  • docker-compose and docker-compose.yml
  • Launch custom built Docker container with docker-compose
  • Entering Docker Container
  • Setup Hadoop, Hive and Spark on Linux without docker
  • Hadoop Preparation
  • Hadoop setup
  • Configure $HADOOP_HOME/etc/hadoop
  • HDFS
  • Start and stop Hadoop
  • Work with Hadoop and HDFS file system
  • Connect to Hadoop web interface port 50070 and 8088
  • Install Hive
  • hive home
  • Initialize hive schema
  • Start hive metastore service.
  • hive-site.xml
  • Hive client
  • Setup Apache Spark
  • Spark Home
  • Jupyter-notebook server
  • Python 3 Warm Up
  • Basics
  • Iterables/Collections
  • Strings
  • List
  • Tuple
  • Dictionary
  • Set
  • Conditional statement
  • for loop
  • while loop
  • Functions and methods
  • map and filter
  • map and filter takes function as input
  • lambda
  • Python Class
  • Input and if statement
  • Input from a file
  • Output to a file
  • try except
  • Python coding exercise
  • Scala Warm Up
  • Start Spylon-kernel on Jupyter-notebook
  • Type of Variable: Mutable or immutable
  • Block statement
  • Scala Data Type
  • Array in Scala
  • Methods
  • Functions
  • Anonymous function
  • Scala map and filter methods
  • Class
  • Objects
  • Trait
  • Tuple in Scala
  • List/Seq
  • Set in Scala
  • Scala Map
  • Scala if statement
  • Scala for loop
  • Scala While Loop
  • Scala Exceptions + try catch finally
  • Scala coding exercise
  • Run a program to estimate pi
  • Common Spark command line
  • Run Scala code with spark-submit
  • Python with Apache Spark using Jupyter notebook
  • Spark Core Introduction
  • Spark and Scala Version
  • Basic Spark Package
  • Resilient Distributed Datasets (RDDs)
  • RDD Operations
  • Passing Function to Spark
  • Printing elements of an RDD
  • Working with key value pair
  • RDD Transformation Functions
  • RDD Action Functions
  • SPARK SQL
  • SQL
  • Datasets and DataFrames
  • SparkSession
  • Creating DataFrames
  • Running SQL Queries Programmatically
  • Issue from running Cartesian Join Query
  • Creating Datasets
  • Interoperating with RDD
  • Untyped User-Defined Aggregate Functions
  • Generic Load/Save Functions
  • Manually specify file option
  • Run SQL on files directly
  • Save Mode
  • Saving to Persistent Tables
  • Bucketing, Sorting and Partitioning
  • Apache Arrow
  • Install Python Arrow Module PyArrow
  • Issue might happen import PyArrow
  • Enabling for Conversion to/from Pandas in Python
  • Connect to any data source the same consistent way
  • Spark SQL Implementation Example in Scala
  • Run scala code in Eclipse IDE
  • Hive Integration, run SQL or HiveQL queries on existing warehouses.
  • Example: Enrich JSON
  • Integrate Tableau Data Visualization with Hive Data Warehouse and Apache Spark SQL
  • Connect Tableau to Spark SQL running in VM with VirtualBox with NAT
  • Issues with connecting from Tableau to Spark SQL
  • SPARK Streaming
  • Discretized Streams (DStreams)
  • Transformations on DStreams
  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • union(otherStream)
  • reduce(func)
  • count()
  • countByValue()
  • reduceByKey(func, [numTasks])
  • join(otherStream, [numTasks])
  • cogroup(otherStream, [numTasks])
  • transform(func)
  • updateStateByKey(func)
  • Scala Tips for updateStateByKey
  • repartition(numPartitions)
  • DStream Window Operations
  • DStream Window Transformation
  • countByWindow(windowLength, slideInterval)
  • reduceByWindow(func, windowLength, slideInterval)
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])
  • window(windowLength, slideInterval)
  • Window DStream print(n)
  • saveAsTextFiles(prefix, [suffix])
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)
  • Build Twitter Scala API Library for Spark Streaming using sbt
  • Spark Streaming with Twitter, you can get public tweets by using Twitter API.
  • Spark streaming use case with Python
  • Spark Graph Computing
  • Spark Graph Computing Continue
  • Graphx
  • Package org.apache.spark.graphx
  • Edge Class
  • EdgeContext Class
  • EdgeDirection Class
  • EdgeRDD Class
  • EdgeTriplet Class
  • Graph Class
  • GraphLoader Object
  • GraphOps Class
  • GraphXUtils Object
  • PartitionStrategy Trait
  • Pregel Object
  • TripletFields Class
  • VertexRDD Class
  • Package org.apache.spark.graphx.impl
  • AggregatingEdgeContext Class
  • EdgeRDDImpl Class
  • Class GraphImpl<VD,ED>
  • Class VertexRDDImpl<VD>
  • Package org.apache.spark.graphx.lib
  • Class ConnectedComponents
  • Class LabelPropagation
  • Class PageRank
  • Class ShortestPaths
  • Class StronglyConnectedComponents
  • Class SVDPlusPlus
  • Class SVDPlusPlus.Conf
  • Class TriangleCount
  • Package org.apache.spark.graphx.util
  • Class BytecodeUtils
  • Class GraphGenerators
  • Graphx Example 1
  • Graphx Example 2
  • Graphx Example 3
  • Spark Graphx Describes Organization Chart Easy and Fast
  • Page Rank with Apache Spark Graphx
  • bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases
  • Tree and Graph Traversal with and without Spark Graphx
  • Graphx Graph Traversal with Pregel Explained
  • Spark Machine Learning
  • Binary Classification
  • Multiclass Classification
  • Regression
  • Correlation
  • Image Data Source
  • ML DataFrame is SQL DataFrame
  • ML Transformer
  • ML Estimator
  • ML Pipeline
  • Transformer/Estimator Parameters
  • Extracting, transforming and selecting features
  • TF-IDF
  • Word2Vec
  • FeatureHasher
  • Tokenizer
  • CountVectorizer
  • StopWordRemover
  • n-gram
  • Binarizer
  • PCA
  • PolynomialExpansion
  • StringIndexer
  • Discrete Cosine Transform (DCT)
  • One-hot encoding
  • StandardScaler
  • IndexToString
  • VectorIndexer
  • Interaction
  • Normalizer
  • MinMaxScaler
  • MaxAbScaler
  • Bucketizer
  • ElementwiseProduct
  • SQLTransformer
  • VectorAssembler
  • VectorSizeHint
  • QuantileDiscretizer
  • Imputer
  • VectorSlicer
  • RFormula
  • ChiSqSelector
  • Locality Sensitive Hashing
  • MinHash for Jaccard Distance
  • Classification and Regression
  • LogisticRegression
  • OneVsRest
  • Naive Bayes classifiers
  • Decision trees
  • Random forests
  • Gradient-boosted trees (GBTs)
  • Multilayer perceptron classifier
  • Linear Support Vector Machine
  • Linear Regression
  • Generalized linear regression
  • Isotonic regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted tree regression
  • Survival regression
  • Clustering
  • k-means
  • Latent Dirichlet allocation or LDA
  • Bisecting k-means
  • A Gaussian Mixture Model
  • Collaborative filtering
  • Frequent Pattern Mining
  • FP-Growth
  • PrefixSpan
  • ML Tuning: model selection and hyperparameter tuning
  • Model selection (a.k.a. hyperparameter tuning)
  • Cross-Validation
  • Train-Validation Split
  • Spark Machine Learning Applications
  • Apache Spark SQL & Machine Learning on Genetic Variant Classifications
  • Data Visualization with Vegas Viz and Scala with Spark ML
  • Apache Spark Machine Learning with Dremio Data Lake Engine
  • Dremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine Learning
  • Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier
  • Setup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VM
  • Virus Xray Image Classification with Tensorflow Keras Python and Apache Spark Scala
  • Appendix -- Video Presentations
  • References
Powered by GitBook
On this page
  • Biology Neuron vs Digital Perceptron:
  • Neuron
  • Perceptron
  • Multilayer perceptron classifier from Apache Spark Machine Learning
  • Math representation of the neural network
  • Activate/Step function:
  • Notes on number neurons/perceptron
  • Build a neural network with Apache Spark Multilayer perceptron classifier (MPLC)
  • Data Preprocessing
  • Ubyte to CSV to libsvm
  • File exploration
  • Helper Scala utility function to convert MNIST ubyte files to CSV file
  • About libsvm format file
  • Scala Helper utility function to convert Spark ML CSV file to libsvm file
  • Caveats
  • Create a neural network to train and test MNIST image of hand written digit
  • Summary

Was this helpful?

Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier

PreviousDremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine LearningNextSetup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VM

Last updated 4 years ago

Was this helpful?

Biology Neuron vs Digital Perceptron:

Neuron

The perceptron is a mathematical replica of a biological neuron. While in actual neurons the dendrite receives electrical signals from the axons of other neurons.

This is also modeled in the perceptron by multiplying each input value by a coefficient called weight, sometime, plus another value called bias. An actual neuron fires an output signal only when the total strength of the input signals exceed a certain threshold.

Perceptron

Imitating, in a perceptron, weighted sum of the inputs to represent the total strength of the input signals is calculated the, and then is applied a step function (or called activate function) on the sum to determine its output.

Multilayer perceptron classifier from Apache Spark Machine Learning

Note:

Each circle in above represents a perceptron:

Each layer is fully connected means neurons (perceptron) are all neurons on the next layer

Feedforward artificial neural network means signals move forward, there is no loop back.

Math representation of the neural network

Neurons in the input layer represent the input data. All other neurons map inputs to outputs by a linear combination of the inputs with the neuron’s weights w and bias b and applying an activation function or step function. This can be written in matrix form for MLPC with K+1 layers as follows:

Activate/Step function:

Neurons in intermediate layers (hidden layers) use sigmoid (logistic) function:

Neurons in the output layer use SoftMax function:

Notes on number neurons/perceptron

The number of neurons N in the output layer corresponds to the number of classes to be classified, The number of neurons in the first layer needs to be equal to number of features (columns)

Build a neural network with Apache Spark Multilayer perceptron classifier (MPLC)

As part of the effort, I need a dataset to train and test the MPLC. The obvious one to go to would classify images of hand written digits data set called MNIST and I am going to code in Scala.

Download MNIST dataset

Down the 4 data files from

However, they are binary files, in ubyte format, and not readily be loaded into Apache Spark dataframe and must be preprocessed.

Data Preprocessing

Ubyte to CSV to libsvm

Ubyte file format is formally known as IDX format.

The IDX file format is a simple format for vectors and multidimensional matrices of various numerical types.

magic number

size in dimension 1

size in dimension 2

size in dimension 3

….

size in dimension N

data

The magic number is four bytes long.

The first 2 bytes are always 0.

The third byte codes the type of the data:

0x08: unsigned byte

0x09: signed byte

0x0B: short (2 bytes)

0x0C: int (4 bytes)

0x0D: float (4 bytes)

0x0E: double (8 bytes)

The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….

The sizes in each dimension are 4-byte integers (big endian, like in most non-Intel processors).

The data is stored like in a C array, i.e. the index in the last dimension changes the fastest.

Convert the 4 MNIST files

These 2 files are for training (1 is feature data, 1 is label data)

train-images-idx3-ubyte

train-labels-idx1-ubyte

These 2 files are for testing (1 is feature data, 1 is label)

t10k-images-idx3-ubyte

t10k-labels-idx1-ubyte

File exploration

To understand the data definition of the feature data, hex dump the file and show first 20 lines

(spark) bigdata2@bigdata2:~/libsvm$ xxd train-images-idx3-ubyte | head -n 20
00000000: 0000 0803 0000 ea60 0000 001c 0000 001c  .......`........
00000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000040: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000050: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000060: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000070: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000080: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000090: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000a0: 0000 0000 0000 0000 0312 1212 7e88 af1a  ............~...
000000b0: a6ff f77f 0000 0000 0000 0000 0000 0000  ................
000000c0: 1e24 5e9a aafd fdfd fdfd e1ac fdf2 c340  .$^............@
000000d0: 0000 0000 0000 0000 0000 0031 eefd fdfd  ...........1....
000000e0: fdfd fdfd fdfb 5d52 5238 2700 0000 0000  ......]RR8'.....
000000f0: 0000 0000 0000 0012 dbfd fdfd fdfd c6b6  ................
00000100: f7f1 0000 0000 0000 0000 0000 0000 0000  ................
00000110: 0000 0000 509c 6bfd fdcd 0b00 2b9a 0000  ....P.k.....+...
00000120: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000130: 000e 019a fd5a 0000 0000 0000 0000 0000  .....Z..........

The first 2 bytes are always 0.

First 2 bytes are 00 00

Third byte is 08 (0x08: unsigned byte)

Fourth byte is 03 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), so this is 3 dimension.

Since this is 3-dimension dataset:

Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000

Next 4-byte integer is 0000001c, which is the size of 2nd dimension. 0x 0000001c = 28

Next 4-byte integer is 0000001c, which is the size of 3rd dimension. 0x 0000001c = 28

Actual feature data follows after that.

This means the training feature data is 60000 rows, each row is a flattened matrix of 28*28 = 784 (feature) columns. This also means, 1st 16 bytes of the training image data are not actual data, but metadata. These 16 bytes will be thrown away when extract actual data from file in the code later on.

Now do the same on the label data file:

(spark) bigdata2@bigdata2:~/libsvm$ xxd train-labels-idx1-ubyte | head -n 20
00000000: 0000 0801 0000 ea60 0500 0401 0902 0103  .......`........
00000010: 0104 0305 0306 0107 0208 0609 0400 0901  ................
00000020: 0102 0403 0207 0308 0609 0005 0600 0706  ................
00000030: 0108 0709 0309 0805 0903 0300 0704 0908  ................
00000040: 0009 0401 0404 0600 0405 0601 0000 0107  ................
00000050: 0106 0300 0201 0107 0900 0206 0708 0309  ................
00000060: 0004 0607 0406 0800 0708 0301 0507 0107  ................
00000070: 0101 0603 0002 0903 0101 0004 0902 0000  ................
00000080: 0200 0207 0108 0604 0106 0304 0509 0103  ................
00000090: 0308 0504 0707 0402 0805 0806 0703 0406  ................
000000a0: 0109 0906 0003 0702 0802 0904 0406 0409  ................
000000b0: 0700 0902 0905 0105 0901 0203 0203 0509  ................
000000c0: 0107 0602 0802 0205 0007 0409 0708 0302  ................
000000d0: 0101 0803 0601 0003 0100 0001 0702 0703  ................
000000e0: 0004 0605 0206 0407 0108 0909 0300 0701  ................
000000f0: 0002 0003 0504 0605 0806 0307 0508 0009  ................
00000100: 0100 0301 0202 0303 0604 0705 0006 0207  ................
00000110: 0908 0509 0201 0104 0405 0604 0102 0503  ................
00000120: 0903 0900 0509 0605 0704 0103 0400 0408  ................
00000130: 0004 0306 0807 0600 0907 0507 0201 0106  ................

First 2 bytes are 00 00

Third byte is 08 (0x08: unsigned byte)

Fourth byte is 01 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), this is 1 dimension.

Since this is 1-dimension dataset:

Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000

Actual feature data follows after that.

This means the training label data is 60000 rows and 1 column. This also means, 1st 8 bytes of the training image label data are not actual data, but metadata. These 8 bytes will be thrown away when extract actual data from file in the code later on.

Test data files are the same format, no need to analyze. Except testing data is 10000 rows while training data is 60000 rows.

Helper Scala utility function to convert MNIST ubyte files to CSV file

Following Scala code is going to read the image and label data files in ubyte/IDX format and convert into text file delimited by “,”, called CSV file.

import java.io._
/*
define helper function to perform MNIST ubyte files to csv conversion. Read ubyte files, and transform and save as single CSV file, 1 column is label, next 784 columns are features
*/
def mnistFileConvertUByteToCSV(imageFileName: String, labelFileName: String, recNum: Int, csvFileName: String): Unit={
val rows = recNum
val cols = 28*28
var data = None: Option[FileInputStream]
var label = None: Option[FileInputStream]
var out = None: Option[FileOutputStream]
val features = Array.ofDim[Int](rows, cols)
val file = new File(csvFileName)
val output = new BufferedWriter(new FileWriter(file))
val target=Array.ofDim[Int](rows,1)
try {
data = Some(new FileInputStream(imageFileName))
label = Some(new FileInputStream(labelFileName))
var c = 0
//throw away 1st 16 bytes as they are not data for feature file
for (_<-0 until 16)
data.get.read
//throw away 1st 8 bytes as they are not data for label file
for (_<-0 until 8)
label.get.read
c = 0
for (i<-0 until rows)
{
c=label.get.read
target(i)(0)=c
output.write(c.toString)
output.write(",")
for (j<-0 until cols)
{
c = data.get.read
features(i)(j)=c
output.write(c.toString)
if (j<cols-1)
output.write(",")
else
output.write("\n")
}
if (i%1000==0)
println(s"Line number: $i")
} } catch {
case e: IOException => e.printStackTrace
} finally {
println("entered finally ...")
if (data.isDefined) data.get.close
if (label.isDefined)
{
label.get.close
}
output.close
}
}
/*
X is path to the mnist feature file (existing)
Y is path to the mnist label file (existing)
Z is path to the csv file after conversion (to be created)
*/
var x="/home/bigdata2/libsvm/train-images-idx3-ubyte"
var y="/home/bigdata2/libsvm/train-labels-idx1-ubyte"
var z="/home/bigdata2/libsvm/mnist_train.csv"
// To convert training MNIST files to a single CSV training file
mnistFileConvertUByteToCSV(x,y,60000,z)
/*
I place some print out to make sure it is not hanging
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
Line number: 4000
...
Line number: 58000
Line number: 59000
entered finally ...
*/
x="/home/bigdata2/libsvm/t10k-images-idx3-ubyte"
y="/home/bigdata2/libsvm/t10k-labels-idx1-ubyte"
z="/home/bigdata2/libsvm/t10k.csv"
// To convert testing MNIST files to a single CSV testing file
mnistFileConvertUByteToCSV(x,y,10000,z)
/*
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
...
Line number: 9000
entered finally ...
*/

About libsvm format file

It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:

label index1:value1 index2:value2 …

where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based.

By the way, LIBSVM format does not have to have sparse data storage, data storage can be dense. Sparse data storage means it will not store fields with zero value, it only stores fields with non-zero value. Dense data storage means it stores fields with all values including zero values. Consequently, file size with dense storage will be larger than the one with sparse storage, but conceivably, it takes less work to process a libsvm file with dense storage than sparse storage.

I also wanted to create a utility to convert csv file into libsvm file, for simplicity, it only convert into libsvm file in dense storage.

For illustrative purpose, convert the following CSV file:

Label, value1, value2, value 3, … value N

Into libsvm file:

Label index1:value1 index2:value2 … indexN:valueN

Scala Helper utility function to convert Spark ML CSV file to libsvm file

Following is the Scala code:

def makeLibsvm(a:Array[String]):String ={
var result=a(0)+" "
for(i<-1 to a.size.toInt-1)
result=result+i+":"+a(i)(0)+" "
return result
}
//convert the training csv to training libsvm files
var csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_train")
//convert the testing csv to testing libsvm files
csvFile=sc.textFile("file:///home/bigdata2/libsvm/t10k.csv")
libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_test")

RDD class method saveAsTextFile() is likely to create multiple parts of the files, you will need to come up a way to automatically merge these parts into one file, or you can do it manually. That is the nature of Spark application that runs on cluster of multiple worker nodes.

I have merged (outside this writing resultant parts files) into mnist_train.libsvm and mnist_test.libsvm.

Caveats

If you really want the Scala code to save RDD into a single file, you will have to copy RDD that spreads on multiple worker nodes into driver worker node that you launch your Spark application on by following code (not recommended, very slow, do not run it)

import java.io._
csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
val libsvmFile = new File("/home/bigdata2/libsvm/mnist_train.libsvm")
val svmout = new BufferedWriter(new FileWriter(libsvmFile))
//.collect will take a long time to copy RDD contents  to driver node
val libsvmArray=libsvm.collect
for (i<-0 until libsvmArray.size)
{
svmout.write(libsvmArray(i))
svmout.write("\n")
if (i%1000==0)
print(s"line $i")
}
svmout.close

Create a neural network to train and test MNIST image of hand written digit

Create a neural network to train and test MNIST image of hand written digit, that has already been converted to libsvm file format from original ubyte raw format that Scala is not able to comprehend.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import spark.implicits._
/*
Load the train and test data from mnist_train.libsvm and mnist_test.libsvm respectively
*/
val train = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_train.libsvm")
val test = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_test.libsvm")
/*
This is important to check number of features (columns), here is 784, that needs to defines number of neurons (perceptrons) in the input layer.
*/
train.show(3)

/*
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  5.0|(784,[152,153,154...|
|  0.0|(784,[127,128,129...|
|  4.0|(784,[160,161,162...|
+-----+--------------------+
only showing top 3 rows

Define the neural network that has 4 layers, 1st layer (input) has 784 perceptrons, so are 2nd and 3rd hidden layes, 4th layer is output that has 10 perceptrons matching 10 classes, 0, 1, 2, … 9
*/

val layers = Array[Int](784, 784, 784, 10)

// create the trainer and set its parameters

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
/*
Test set accuracy = 0.9193

Training accuracy is 0.9193, about 92%, meaning out of every 100 hand written digit image, model (machine) recognizes about 92 correctly, about 8 wrong, meaning 92 prediction = label


In fact, you can use SQL query to come up the accuracy metrics, because result variable is a Spark SQL dataframe


To use Spark SQL, create a temp view from the dataframe variable result
*/

result.toDF.createOrReplaceTempView("deep_learning")

/*
Run testing query
select * from deep_learning where prediction = label and label = 2.0 limit 2"
*/

spark.sql("select * from deep_learning where prediction = label and label = 2.0 limit 2").show()

/*
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  2.0|(784,[94,95,96,97...|[-7.3486569212363...|[1.88751449816836...|       2.0|
|  2.0|(784,[124,125,126...|[-4.9435463442057...|[1.24313405935197...|       2.0|
+-----+--------------------+--------------------+--------------------+----------+

Following query will show the accuracy, to divide the count of rows where prediction=label by the total count:
with x as (select count(*) as count_x from deep learning
where prediction = label),
y as (select count(*) as total from deep_learning)
select count_x/total as accuracy from x,y
*/

spark.sql("with x as (select count(*) as count_x from deep_learning where prediction = label),y as (select count(*) as total from deep_learning) select count_x/total as accuracy from x,y").show()

/*
+--------+
|accuracy|
+--------+
|  0.9193|
+--------+

Following sub-query in the select list produces the same result:
select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning
*/

spark.sql("select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning").show()

/*
+--------+
|accuracy|
+--------+
|  0.9193|
+--------+

*/

Summary

While Apache Spark Multilayer perceptron classifier is no replacement of TensorFlow, in fact, Apache has its own deep learning library MXNet that is more comparable to TensorFlow, building neural network with Multilayer perceptron classifier under specific use case make good sense especially on the data that is already with Spark distributed computing cluster and in concert with Spark SQL and Spark Streaming.

Multilayer perceptron classifier (MLPC) from Apache Spark ML is a classifier based on the feedforward artificial neural network. MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network.

The basic format according to is:

http://yann.lecun.com/exdb/mnist/
http://www.fon.hum.uva.nl/praat/manual/IDX_file_format.html
http://yann.lecun.com/exdb/mnist/