Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier

Biology Neuron vs Digital Perceptron:

Neuron

The perceptron is a mathematical replica of a biological neuron. While in actual neurons the dendrite receives electrical signals from the axons of other neurons.

This is also modeled in the perceptron by multiplying each input value by a coefficient called weight, sometime, plus another value called bias. An actual neuron fires an output signal only when the total strength of the input signals exceed a certain threshold.

Perceptron

Imitating, in a perceptron, weighted sum of the inputs to represent the total strength of the input signals is calculated the, and then is applied a step function (or called activate function) on the sum to determine its output.

Multilayer perceptron classifier from Apache Spark Machine Learning

Note:

Each circle in above represents a perceptron:

Each layer is fully connected means neurons (perceptron) are all neurons on the next layer

Feedforward artificial neural network means signals move forward, there is no loop back.

Math representation of the neural network

Neurons in the input layer represent the input data. All other neurons map inputs to outputs by a linear combination of the inputs with the neuron’s weights w and bias b and applying an activation function or step function. This can be written in matrix form for MLPC with K+1 layers as follows:

Activate/Step function:

Neurons in intermediate layers (hidden layers) use sigmoid (logistic) function:

Neurons in the output layer use SoftMax function:

Notes on number neurons/perceptron

The number of neurons N in the output layer corresponds to the number of classes to be classified, The number of neurons in the first layer needs to be equal to number of features (columns)

Build a neural network with Apache Spark Multilayer perceptron classifier (MPLC)

As part of the effort, I need a dataset to train and test the MPLC. The obvious one to go to would classify images of hand written digits data set called MNIST and I am going to code in Scala.

Download MNIST dataset

Down the 4 data files from

http://yann.lecun.com/exdb/mnist/

However, they are binary files, in ubyte format, and not readily be loaded into Apache Spark dataframe and must be preprocessed.

Data Preprocessing

Ubyte to CSV to libsvm

Ubyte file format is formally known as IDX format.

http://www.fon.hum.uva.nl/praat/manual/IDX_file_format.html

The IDX file format is a simple format for vectors and multidimensional matrices of various numerical types.

The basic format according to http://yann.lecun.com/exdb/mnist/ is:

magic number

size in dimension 1

size in dimension 2

size in dimension 3

….

size in dimension N

data

The magic number is four bytes long.

The first 2 bytes are always 0.

The third byte codes the type of the data:

0x08: unsigned byte

0x09: signed byte

0x0B: short (2 bytes)

0x0C: int (4 bytes)

0x0D: float (4 bytes)

0x0E: double (8 bytes)

The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….

The sizes in each dimension are 4-byte integers (big endian, like in most non-Intel processors).

The data is stored like in a C array, i.e. the index in the last dimension changes the fastest.

Convert the 4 MNIST files

These 2 files are for training (1 is feature data, 1 is label data)

train-images-idx3-ubyte

train-labels-idx1-ubyte

These 2 files are for testing (1 is feature data, 1 is label)

t10k-images-idx3-ubyte

t10k-labels-idx1-ubyte

File exploration

To understand the data definition of the feature data, hex dump the file and show first 20 lines

(spark) bigdata2@bigdata2:~/libsvm$ xxd train-images-idx3-ubyte | head -n 20
00000000: 0000 0803 0000 ea60 0000 001c 0000 001c  .......`........
00000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000040: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000050: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000060: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000070: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000080: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000090: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000a0: 0000 0000 0000 0000 0312 1212 7e88 af1a  ............~...
000000b0: a6ff f77f 0000 0000 0000 0000 0000 0000  ................
000000c0: 1e24 5e9a aafd fdfd fdfd e1ac fdf2 c340  .$^............@
000000d0: 0000 0000 0000 0000 0000 0031 eefd fdfd  ...........1....
000000e0: fdfd fdfd fdfb 5d52 5238 2700 0000 0000  ......]RR8'.....
000000f0: 0000 0000 0000 0012 dbfd fdfd fdfd c6b6  ................
00000100: f7f1 0000 0000 0000 0000 0000 0000 0000  ................
00000110: 0000 0000 509c 6bfd fdcd 0b00 2b9a 0000  ....P.k.....+...
00000120: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000130: 000e 019a fd5a 0000 0000 0000 0000 0000  .....Z..........

The first 2 bytes are always 0.

First 2 bytes are 00 00

Third byte is 08 (0x08: unsigned byte)

Fourth byte is 03 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), so this is 3 dimension.

Since this is 3-dimension dataset:

Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000

Next 4-byte integer is 0000001c, which is the size of 2nd dimension. 0x 0000001c = 28

Next 4-byte integer is 0000001c, which is the size of 3rd dimension. 0x 0000001c = 28

Actual feature data follows after that.

This means the training feature data is 60000 rows, each row is a flattened matrix of 28*28 = 784 (feature) columns. This also means, 1st 16 bytes of the training image data are not actual data, but metadata. These 16 bytes will be thrown away when extract actual data from file in the code later on.

Now do the same on the label data file:

(spark) bigdata2@bigdata2:~/libsvm$ xxd train-labels-idx1-ubyte | head -n 20
00000000: 0000 0801 0000 ea60 0500 0401 0902 0103  .......`........
00000010: 0104 0305 0306 0107 0208 0609 0400 0901  ................
00000020: 0102 0403 0207 0308 0609 0005 0600 0706  ................
00000030: 0108 0709 0309 0805 0903 0300 0704 0908  ................
00000040: 0009 0401 0404 0600 0405 0601 0000 0107  ................
00000050: 0106 0300 0201 0107 0900 0206 0708 0309  ................
00000060: 0004 0607 0406 0800 0708 0301 0507 0107  ................
00000070: 0101 0603 0002 0903 0101 0004 0902 0000  ................
00000080: 0200 0207 0108 0604 0106 0304 0509 0103  ................
00000090: 0308 0504 0707 0402 0805 0806 0703 0406  ................
000000a0: 0109 0906 0003 0702 0802 0904 0406 0409  ................
000000b0: 0700 0902 0905 0105 0901 0203 0203 0509  ................
000000c0: 0107 0602 0802 0205 0007 0409 0708 0302  ................
000000d0: 0101 0803 0601 0003 0100 0001 0702 0703  ................
000000e0: 0004 0605 0206 0407 0108 0909 0300 0701  ................
000000f0: 0002 0003 0504 0605 0806 0307 0508 0009  ................
00000100: 0100 0301 0202 0303 0604 0705 0006 0207  ................
00000110: 0908 0509 0201 0104 0405 0604 0102 0503  ................
00000120: 0903 0900 0509 0605 0704 0103 0400 0408  ................
00000130: 0004 0306 0807 0600 0907 0507 0201 0106  ................

First 2 bytes are 00 00

Third byte is 08 (0x08: unsigned byte)

Fourth byte is 01 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), this is 1 dimension.

Since this is 1-dimension dataset:

Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000

Actual feature data follows after that.

This means the training label data is 60000 rows and 1 column. This also means, 1st 8 bytes of the training image label data are not actual data, but metadata. These 8 bytes will be thrown away when extract actual data from file in the code later on.

Test data files are the same format, no need to analyze. Except testing data is 10000 rows while training data is 60000 rows.

Helper Scala utility function to convert MNIST ubyte files to CSV file

Following Scala code is going to read the image and label data files in ubyte/IDX format and convert into text file delimited by “,”, called CSV file.

import java.io._
/*
define helper function to perform MNIST ubyte files to csv conversion. Read ubyte files, and transform and save as single CSV file, 1 column is label, next 784 columns are features
*/
def mnistFileConvertUByteToCSV(imageFileName: String, labelFileName: String, recNum: Int, csvFileName: String): Unit={
val rows = recNum
val cols = 28*28
var data = None: Option[FileInputStream]
var label = None: Option[FileInputStream]
var out = None: Option[FileOutputStream]
val features = Array.ofDim[Int](rows, cols)
val file = new File(csvFileName)
val output = new BufferedWriter(new FileWriter(file))
val target=Array.ofDim[Int](rows,1)
try {
data = Some(new FileInputStream(imageFileName))
label = Some(new FileInputStream(labelFileName))
var c = 0
//throw away 1st 16 bytes as they are not data for feature file
for (_<-0 until 16)
data.get.read
//throw away 1st 8 bytes as they are not data for label file
for (_<-0 until 8)
label.get.read
c = 0
for (i<-0 until rows)
{
c=label.get.read
target(i)(0)=c
output.write(c.toString)
output.write(",")
for (j<-0 until cols)
{
c = data.get.read
features(i)(j)=c
output.write(c.toString)
if (j<cols-1)
output.write(",")
else
output.write("\n")
}
if (i%1000==0)
println(s"Line number: $i")
} } catch {
case e: IOException => e.printStackTrace
} finally {
println("entered finally ...")
if (data.isDefined) data.get.close
if (label.isDefined)
{
label.get.close
}
output.close
}
}
/*
X is path to the mnist feature file (existing)
Y is path to the mnist label file (existing)
Z is path to the csv file after conversion (to be created)
*/
var x="/home/bigdata2/libsvm/train-images-idx3-ubyte"
var y="/home/bigdata2/libsvm/train-labels-idx1-ubyte"
var z="/home/bigdata2/libsvm/mnist_train.csv"
// To convert training MNIST files to a single CSV training file
mnistFileConvertUByteToCSV(x,y,60000,z)
/*
I place some print out to make sure it is not hanging
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
Line number: 4000
...
Line number: 58000
Line number: 59000
entered finally ...
*/
x="/home/bigdata2/libsvm/t10k-images-idx3-ubyte"
y="/home/bigdata2/libsvm/t10k-labels-idx1-ubyte"
z="/home/bigdata2/libsvm/t10k.csv"
// To convert testing MNIST files to a single CSV testing file
mnistFileConvertUByteToCSV(x,y,10000,z)
/*
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
...
Line number: 9000
entered finally ...
*/

About libsvm format file

It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:

label index1:value1 index2:value2 …

where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based.

By the way, LIBSVM format does not have to have sparse data storage, data storage can be dense. Sparse data storage means it will not store fields with zero value, it only stores fields with non-zero value. Dense data storage means it stores fields with all values including zero values. Consequently, file size with dense storage will be larger than the one with sparse storage, but conceivably, it takes less work to process a libsvm file with dense storage than sparse storage.

I also wanted to create a utility to convert csv file into libsvm file, for simplicity, it only convert into libsvm file in dense storage.

For illustrative purpose, convert the following CSV file:

Label, value1, value2, value 3, … value N

Into libsvm file:

Label index1:value1 index2:value2 … indexN:valueN

Scala Helper utility function to convert Spark ML CSV file to libsvm file

Following is the Scala code:

def makeLibsvm(a:Array[String]):String ={
var result=a(0)+" "
for(i<-1 to a.size.toInt-1)
result=result+i+":"+a(i)(0)+" "
return result
}
//convert the training csv to training libsvm files
var csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_train")
//convert the testing csv to testing libsvm files
csvFile=sc.textFile("file:///home/bigdata2/libsvm/t10k.csv")
libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_test")

RDD class method saveAsTextFile() is likely to create multiple parts of the files, you will need to come up a way to automatically merge these parts into one file, or you can do it manually. That is the nature of Spark application that runs on cluster of multiple worker nodes.

I have merged (outside this writing resultant parts files) into mnist_train.libsvm and mnist_test.libsvm.

Caveats

If you really want the Scala code to save RDD into a single file, you will have to copy RDD that spreads on multiple worker nodes into driver worker node that you launch your Spark application on by following code (not recommended, very slow, do not run it)

import java.io._
csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
val libsvmFile = new File("/home/bigdata2/libsvm/mnist_train.libsvm")
val svmout = new BufferedWriter(new FileWriter(libsvmFile))
//.collect will take a long time to copy RDD contents  to driver node
val libsvmArray=libsvm.collect
for (i<-0 until libsvmArray.size)
{
svmout.write(libsvmArray(i))
svmout.write("\n")
if (i%1000==0)
print(s"line $i")
}
svmout.close

Create a neural network to train and test MNIST image of hand written digit

Create a neural network to train and test MNIST image of hand written digit, that has already been converted to libsvm file format from original ubyte raw format that Scala is not able to comprehend.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import spark.implicits._
/*
Load the train and test data from mnist_train.libsvm and mnist_test.libsvm respectively
*/
val train = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_train.libsvm")
val test = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_test.libsvm")
/*
This is important to check number of features (columns), here is 784, that needs to defines number of neurons (perceptrons) in the input layer.
*/
train.show(3)

/*
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  5.0|(784,[152,153,154...|
|  0.0|(784,[127,128,129...|
|  4.0|(784,[160,161,162...|
+-----+--------------------+
only showing top 3 rows

Define the neural network that has 4 layers, 1st layer (input) has 784 perceptrons, so are 2nd and 3rd hidden layes, 4th layer is output that has 10 perceptrons matching 10 classes, 0, 1, 2, … 9
*/

val layers = Array[Int](784, 784, 784, 10)

// create the trainer and set its parameters

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
/*
Test set accuracy = 0.9193

Training accuracy is 0.9193, about 92%, meaning out of every 100 hand written digit image, model (machine) recognizes about 92 correctly, about 8 wrong, meaning 92 prediction = label


In fact, you can use SQL query to come up the accuracy metrics, because result variable is a Spark SQL dataframe


To use Spark SQL, create a temp view from the dataframe variable result
*/

result.toDF.createOrReplaceTempView("deep_learning")

/*
Run testing query
select * from deep_learning where prediction = label and label = 2.0 limit 2"
*/

spark.sql("select * from deep_learning where prediction = label and label = 2.0 limit 2").show()

/*
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  2.0|(784,[94,95,96,97...|[-7.3486569212363...|[1.88751449816836...|       2.0|
|  2.0|(784,[124,125,126...|[-4.9435463442057...|[1.24313405935197...|       2.0|
+-----+--------------------+--------------------+--------------------+----------+

Following query will show the accuracy, to divide the count of rows where prediction=label by the total count:
with x as (select count(*) as count_x from deep learning
where prediction = label),
y as (select count(*) as total from deep_learning)
select count_x/total as accuracy from x,y
*/

spark.sql("with x as (select count(*) as count_x from deep_learning where prediction = label),y as (select count(*) as total from deep_learning) select count_x/total as accuracy from x,y").show()

/*
+--------+
|accuracy|
+--------+
|  0.9193|
+--------+

Following sub-query in the select list produces the same result:
select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning
*/

spark.sql("select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning").show()

/*
+--------+
|accuracy|
+--------+
|  0.9193|
+--------+

*/

Summary

While Apache Spark Multilayer perceptron classifier is no replacement of TensorFlow, in fact, Apache has its own deep learning library MXNet that is more comparable to TensorFlow, building neural network with Multilayer perceptron classifier under specific use case make good sense especially on the data that is already with Spark distributed computing cluster and in concert with Spark SQL and Spark Streaming.

Last updated