Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier
Biology Neuron vs Digital Perceptron:
Neuron
The perceptron is a mathematical replica of a biological neuron. While in actual neurons the dendrite receives electrical signals from the axons of other neurons.
This is also modeled in the perceptron by multiplying each input value by a coefficient called weight, sometime, plus another value called bias. An actual neuron fires an output signal only when the total strength of the input signals exceed a certain threshold.
Perceptron
Imitating, in a perceptron, weighted sum of the inputs to represent the total strength of the input signals is calculated the, and then is applied a step function (or called activate function) on the sum to determine its output.
Multilayer perceptron classifier from Apache Spark Machine Learning
Note:
Each circle in above represents a perceptron:
Each layer is fully connected means neurons (perceptron) are all neurons on the next layer
Feedforward artificial neural network means signals move forward, there is no loop back.
Math representation of the neural network
Neurons in the input layer represent the input data. All other neurons map inputs to outputs by a linear combination of the inputs with the neuron’s weights w and bias b and applying an activation function or step function. This can be written in matrix form for MLPC with K+1 layers as follows:
Activate/Step function:
Neurons in intermediate layers (hidden layers) use sigmoid (logistic) function:
Neurons in the output layer use SoftMax function:
Notes on number neurons/perceptron
The number of neurons N in the output layer corresponds to the number of classes to be classified, The number of neurons in the first layer needs to be equal to number of features (columns)
Build a neural network with Apache Spark Multilayer perceptron classifier (MPLC)
As part of the effort, I need a dataset to train and test the MPLC. The obvious one to go to would classify images of hand written digits data set called MNIST and I am going to code in Scala.
Fourth byte is 03 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), so this is 3 dimension.
Since this is 3-dimension dataset:
Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000
Next 4-byte integer is 0000001c, which is the size of 2nd dimension. 0x 0000001c = 28
Next 4-byte integer is 0000001c, which is the size of 3rd dimension. 0x 0000001c = 28
Actual feature data follows after that.
This means the training feature data is 60000 rows, each row is a flattened matrix of 28*28 = 784 (feature) columns. This also means, 1st 16 bytes of the training image data are not actual data, but metadata. These 16 bytes will be thrown away when extract actual data from file in the code later on.
Fourth byte is 01 (The fourth byte codes the number of dimensions of the vector/matrix: 1 for vectors, 2 for matrices….), this is 1 dimension.
Since this is 1-dimension dataset:
Next 4-byte integer is 0000ea60, which is the size of 1st dimension. 0x 0000ea60 = 60000
Actual feature data follows after that.
This means the training label data is 60000 rows and 1 column. This also means, 1st 8 bytes of the training image label data are not actual data, but metadata. These 8 bytes will be thrown away when extract actual data from file in the code later on.
Test data files are the same format, no need to analyze. Except testing data is 10000 rows while training data is 60000 rows.
Helper Scala utility function to convert MNIST ubyte files to CSV file
Following Scala code is going to read the image and label data files in ubyte/IDX format and convert into text file delimited by “,”, called CSV file.
import java.io._
/*
define helper function to perform MNIST ubyte files to csv conversion. Read ubyte files, and transform and save as single CSV file, 1 column is label, next 784 columns are features
*/
def mnistFileConvertUByteToCSV(imageFileName: String, labelFileName: String, recNum: Int, csvFileName: String): Unit={
val rows = recNum
val cols = 28*28
var data = None: Option[FileInputStream]
var label = None: Option[FileInputStream]
var out = None: Option[FileOutputStream]
val features = Array.ofDim[Int](rows, cols)
val file = new File(csvFileName)
val output = new BufferedWriter(new FileWriter(file))
val target=Array.ofDim[Int](rows,1)
try {
data = Some(new FileInputStream(imageFileName))
label = Some(new FileInputStream(labelFileName))
var c = 0
//throw away 1st 16 bytes as they are not data for feature file
for (_<-0 until 16)
data.get.read
//throw away 1st 8 bytes as they are not data for label file
for (_<-0 until 8)
label.get.read
c = 0
for (i<-0 until rows)
{
c=label.get.read
target(i)(0)=c
output.write(c.toString)
output.write(",")
for (j<-0 until cols)
{
c = data.get.read
features(i)(j)=c
output.write(c.toString)
if (j<cols-1)
output.write(",")
else
output.write("\n")
}
if (i%1000==0)
println(s"Line number: $i")
} } catch {
case e: IOException => e.printStackTrace
} finally {
println("entered finally ...")
if (data.isDefined) data.get.close
if (label.isDefined)
{
label.get.close
}
output.close
}
}
/*
X is path to the mnist feature file (existing)
Y is path to the mnist label file (existing)
Z is path to the csv file after conversion (to be created)
*/
var x="/home/bigdata2/libsvm/train-images-idx3-ubyte"
var y="/home/bigdata2/libsvm/train-labels-idx1-ubyte"
var z="/home/bigdata2/libsvm/mnist_train.csv"
// To convert training MNIST files to a single CSV training file
mnistFileConvertUByteToCSV(x,y,60000,z)
/*
I place some print out to make sure it is not hanging
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
Line number: 4000
...
Line number: 58000
Line number: 59000
entered finally ...
*/
x="/home/bigdata2/libsvm/t10k-images-idx3-ubyte"
y="/home/bigdata2/libsvm/t10k-labels-idx1-ubyte"
z="/home/bigdata2/libsvm/t10k.csv"
// To convert testing MNIST files to a single CSV testing file
mnistFileConvertUByteToCSV(x,y,10000,z)
/*
Line number: 0
Line number: 1000
Line number: 2000
Line number: 3000
...
Line number: 9000
entered finally ...
*/
About libsvm format file
It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:
label index1:value1 index2:value2 …
where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based.
By the way, LIBSVM format does not have to have sparse data storage, data storage can be dense. Sparse data storage means it will not store fields with zero value, it only stores fields with non-zero value. Dense data storage means it stores fields with all values including zero values. Consequently, file size with dense storage will be larger than the one with sparse storage, but conceivably, it takes less work to process a libsvm file with dense storage than sparse storage.
I also wanted to create a utility to convert csv file into libsvm file, for simplicity, it only convert into libsvm file in dense storage.
For illustrative purpose, convert the following CSV file:
Label, value1, value2, value 3, … value N
Into libsvm file:
Label index1:value1 index2:value2 … indexN:valueN
Scala Helper utility function to convert Spark ML CSV file to libsvm file
Following is the Scala code:
def makeLibsvm(a:Array[String]):String ={
var result=a(0)+" "
for(i<-1 to a.size.toInt-1)
result=result+i+":"+a(i)(0)+" "
return result
}
//convert the training csv to training libsvm files
var csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_train")
//convert the testing csv to testing libsvm files
csvFile=sc.textFile("file:///home/bigdata2/libsvm/t10k.csv")
libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
libsvm.saveAsTextFile("file:///home/bigdata2/libsvm/mnist_test")
RDD class method saveAsTextFile() is likely to create multiple parts of the files, you will need to come up a way to automatically merge these parts into one file, or you can do it manually. That is the nature of Spark application that runs on cluster of multiple worker nodes.
I have merged (outside this writing resultant parts files) into mnist_train.libsvm and mnist_test.libsvm.
Caveats
If you really want the Scala code to save RDD into a single file, you will have to copy RDD that spreads on multiple worker nodes into driver worker node that you launch your Spark application on by following code (not recommended, very slow, do not run it)
import java.io._
csvFile=sc.textFile("file:///home/bigdata2/libsvm/mnist_train.csv")
var libsvm=csvFile.map(line => line.split(',')).map(i=>makeLibsvm(i))
val libsvmFile = new File("/home/bigdata2/libsvm/mnist_train.libsvm")
val svmout = new BufferedWriter(new FileWriter(libsvmFile))
//.collect will take a long time to copy RDD contents to driver node
val libsvmArray=libsvm.collect
for (i<-0 until libsvmArray.size)
{
svmout.write(libsvmArray(i))
svmout.write("\n")
if (i%1000==0)
print(s"line $i")
}
svmout.close
Create a neural network to train and test MNIST image of hand written digit
Create a neural network to train and test MNIST image of hand written digit, that has already been converted to libsvm file format from original ubyte raw format that Scala is not able to comprehend.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import spark.implicits._
/*
Load the train and test data from mnist_train.libsvm and mnist_test.libsvm respectively
*/
val train = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_train.libsvm")
val test = spark.read.format("libsvm")
.load("file:///home/bigdata2/libsvm/mnist_test.libsvm")
/*
This is important to check number of features (columns), here is 784, that needs to defines number of neurons (perceptrons) in the input layer.
*/
train.show(3)
/*
+-----+--------------------+
|label| features|
+-----+--------------------+
| 5.0|(784,[152,153,154...|
| 0.0|(784,[127,128,129...|
| 4.0|(784,[160,161,162...|
+-----+--------------------+
only showing top 3 rows
Define the neural network that has 4 layers, 1st layer (input) has 784 perceptrons, so are 2nd and 3rd hidden layes, 4th layer is output that has 10 perceptrons matching 10 classes, 0, 1, 2, … 9
*/
val layers = Array[Int](784, 784, 784, 10)
// create the trainer and set its parameters
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
/*
Test set accuracy = 0.9193
Training accuracy is 0.9193, about 92%, meaning out of every 100 hand written digit image, model (machine) recognizes about 92 correctly, about 8 wrong, meaning 92 prediction = label
In fact, you can use SQL query to come up the accuracy metrics, because result variable is a Spark SQL dataframe
To use Spark SQL, create a temp view from the dataframe variable result
*/
result.toDF.createOrReplaceTempView("deep_learning")
/*
Run testing query
select * from deep_learning where prediction = label and label = 2.0 limit 2"
*/
spark.sql("select * from deep_learning where prediction = label and label = 2.0 limit 2").show()
/*
+-----+--------------------+--------------------+--------------------+----------+
|label| features| rawPrediction| probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
| 2.0|(784,[94,95,96,97...|[-7.3486569212363...|[1.88751449816836...| 2.0|
| 2.0|(784,[124,125,126...|[-4.9435463442057...|[1.24313405935197...| 2.0|
+-----+--------------------+--------------------+--------------------+----------+
Following query will show the accuracy, to divide the count of rows where prediction=label by the total count:
with x as (select count(*) as count_x from deep learning
where prediction = label),
y as (select count(*) as total from deep_learning)
select count_x/total as accuracy from x,y
*/
spark.sql("with x as (select count(*) as count_x from deep_learning where prediction = label),y as (select count(*) as total from deep_learning) select count_x/total as accuracy from x,y").show()
/*
+--------+
|accuracy|
+--------+
| 0.9193|
+--------+
Following sub-query in the select list produces the same result:
select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning
*/
spark.sql("select (select count(*) from deep_learning where label=prediction)/count(*) as accuracy from deep_learning").show()
/*
+--------+
|accuracy|
+--------+
| 0.9193|
+--------+
*/
Summary
While Apache Spark Multilayer perceptron classifier is no replacement of TensorFlow, in fact, Apache has its own deep learning library MXNet that is more comparable to TensorFlow, building neural network with Multilayer perceptron classifier under specific use case make good sense especially on the data that is already with Spark distributed computing cluster and in concert with Spark SQL and Spark Streaming.
Multilayer perceptron classifier (MLPC) from Apache Spark ML is a classifier based on the feedforward artificial neural network. MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network.