📔
Data Science with Apache Spark
  • Preface
  • Contents
  • Basic Prerequisite Skills
  • Computer needed for this course
  • Spark Environment Setup
  • Dev environment setup, task list
  • JDK setup
  • Download and install Anaconda Python and create virtual environment with Python 3.6
  • Download and install Spark
  • Eclipse, the Scala IDE
  • Install findspark, add spylon-kernel for scala
  • ssh and scp client
  • Summary
  • Development environment on MacOS
  • Production Spark Environment Setup
  • VirtualBox VM
  • VirtualBox only shows 32bit on AMD CPU
  • Configure VirtualBox NAT as Network Adapter on Guest VM and Allow putty ssh Through Port Forwarding
  • Docker deployment of Spark Cluster
  • Create customized Apache Spark Docker container
  • Dockerfile
  • docker-compose and docker-compose.yml
  • Launch custom built Docker container with docker-compose
  • Entering Docker Container
  • Setup Hadoop, Hive and Spark on Linux without docker
  • Hadoop Preparation
  • Hadoop setup
  • Configure $HADOOP_HOME/etc/hadoop
  • HDFS
  • Start and stop Hadoop
  • Work with Hadoop and HDFS file system
  • Connect to Hadoop web interface port 50070 and 8088
  • Install Hive
  • hive home
  • Initialize hive schema
  • Start hive metastore service.
  • hive-site.xml
  • Hive client
  • Setup Apache Spark
  • Spark Home
  • Jupyter-notebook server
  • Python 3 Warm Up
  • Basics
  • Iterables/Collections
  • Strings
  • List
  • Tuple
  • Dictionary
  • Set
  • Conditional statement
  • for loop
  • while loop
  • Functions and methods
  • map and filter
  • map and filter takes function as input
  • lambda
  • Python Class
  • Input and if statement
  • Input from a file
  • Output to a file
  • try except
  • Python coding exercise
  • Scala Warm Up
  • Start Spylon-kernel on Jupyter-notebook
  • Type of Variable: Mutable or immutable
  • Block statement
  • Scala Data Type
  • Array in Scala
  • Methods
  • Functions
  • Anonymous function
  • Scala map and filter methods
  • Class
  • Objects
  • Trait
  • Tuple in Scala
  • List/Seq
  • Set in Scala
  • Scala Map
  • Scala if statement
  • Scala for loop
  • Scala While Loop
  • Scala Exceptions + try catch finally
  • Scala coding exercise
  • Run a program to estimate pi
  • Common Spark command line
  • Run Scala code with spark-submit
  • Python with Apache Spark using Jupyter notebook
  • Spark Core Introduction
  • Spark and Scala Version
  • Basic Spark Package
  • Resilient Distributed Datasets (RDDs)
  • RDD Operations
  • Passing Function to Spark
  • Printing elements of an RDD
  • Working with key value pair
  • RDD Transformation Functions
  • RDD Action Functions
  • SPARK SQL
  • SQL
  • Datasets and DataFrames
  • SparkSession
  • Creating DataFrames
  • Running SQL Queries Programmatically
  • Issue from running Cartesian Join Query
  • Creating Datasets
  • Interoperating with RDD
  • Untyped User-Defined Aggregate Functions
  • Generic Load/Save Functions
  • Manually specify file option
  • Run SQL on files directly
  • Save Mode
  • Saving to Persistent Tables
  • Bucketing, Sorting and Partitioning
  • Apache Arrow
  • Install Python Arrow Module PyArrow
  • Issue might happen import PyArrow
  • Enabling for Conversion to/from Pandas in Python
  • Connect to any data source the same consistent way
  • Spark SQL Implementation Example in Scala
  • Run scala code in Eclipse IDE
  • Hive Integration, run SQL or HiveQL queries on existing warehouses.
  • Example: Enrich JSON
  • Integrate Tableau Data Visualization with Hive Data Warehouse and Apache Spark SQL
  • Connect Tableau to Spark SQL running in VM with VirtualBox with NAT
  • Issues with connecting from Tableau to Spark SQL
  • SPARK Streaming
  • Discretized Streams (DStreams)
  • Transformations on DStreams
  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • union(otherStream)
  • reduce(func)
  • count()
  • countByValue()
  • reduceByKey(func, [numTasks])
  • join(otherStream, [numTasks])
  • cogroup(otherStream, [numTasks])
  • transform(func)
  • updateStateByKey(func)
  • Scala Tips for updateStateByKey
  • repartition(numPartitions)
  • DStream Window Operations
  • DStream Window Transformation
  • countByWindow(windowLength, slideInterval)
  • reduceByWindow(func, windowLength, slideInterval)
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])
  • window(windowLength, slideInterval)
  • Window DStream print(n)
  • saveAsTextFiles(prefix, [suffix])
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)
  • Build Twitter Scala API Library for Spark Streaming using sbt
  • Spark Streaming with Twitter, you can get public tweets by using Twitter API.
  • Spark streaming use case with Python
  • Spark Graph Computing
  • Spark Graph Computing Continue
  • Graphx
  • Package org.apache.spark.graphx
  • Edge Class
  • EdgeContext Class
  • EdgeDirection Class
  • EdgeRDD Class
  • EdgeTriplet Class
  • Graph Class
  • GraphLoader Object
  • GraphOps Class
  • GraphXUtils Object
  • PartitionStrategy Trait
  • Pregel Object
  • TripletFields Class
  • VertexRDD Class
  • Package org.apache.spark.graphx.impl
  • AggregatingEdgeContext Class
  • EdgeRDDImpl Class
  • Class GraphImpl<VD,ED>
  • Class VertexRDDImpl<VD>
  • Package org.apache.spark.graphx.lib
  • Class ConnectedComponents
  • Class LabelPropagation
  • Class PageRank
  • Class ShortestPaths
  • Class StronglyConnectedComponents
  • Class SVDPlusPlus
  • Class SVDPlusPlus.Conf
  • Class TriangleCount
  • Package org.apache.spark.graphx.util
  • Class BytecodeUtils
  • Class GraphGenerators
  • Graphx Example 1
  • Graphx Example 2
  • Graphx Example 3
  • Spark Graphx Describes Organization Chart Easy and Fast
  • Page Rank with Apache Spark Graphx
  • bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases
  • Tree and Graph Traversal with and without Spark Graphx
  • Graphx Graph Traversal with Pregel Explained
  • Spark Machine Learning
  • Binary Classification
  • Multiclass Classification
  • Regression
  • Correlation
  • Image Data Source
  • ML DataFrame is SQL DataFrame
  • ML Transformer
  • ML Estimator
  • ML Pipeline
  • Transformer/Estimator Parameters
  • Extracting, transforming and selecting features
  • TF-IDF
  • Word2Vec
  • FeatureHasher
  • Tokenizer
  • CountVectorizer
  • StopWordRemover
  • n-gram
  • Binarizer
  • PCA
  • PolynomialExpansion
  • StringIndexer
  • Discrete Cosine Transform (DCT)
  • One-hot encoding
  • StandardScaler
  • IndexToString
  • VectorIndexer
  • Interaction
  • Normalizer
  • MinMaxScaler
  • MaxAbScaler
  • Bucketizer
  • ElementwiseProduct
  • SQLTransformer
  • VectorAssembler
  • VectorSizeHint
  • QuantileDiscretizer
  • Imputer
  • VectorSlicer
  • RFormula
  • ChiSqSelector
  • Locality Sensitive Hashing
  • MinHash for Jaccard Distance
  • Classification and Regression
  • LogisticRegression
  • OneVsRest
  • Naive Bayes classifiers
  • Decision trees
  • Random forests
  • Gradient-boosted trees (GBTs)
  • Multilayer perceptron classifier
  • Linear Support Vector Machine
  • Linear Regression
  • Generalized linear regression
  • Isotonic regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted tree regression
  • Survival regression
  • Clustering
  • k-means
  • Latent Dirichlet allocation or LDA
  • Bisecting k-means
  • A Gaussian Mixture Model
  • Collaborative filtering
  • Frequent Pattern Mining
  • FP-Growth
  • PrefixSpan
  • ML Tuning: model selection and hyperparameter tuning
  • Model selection (a.k.a. hyperparameter tuning)
  • Cross-Validation
  • Train-Validation Split
  • Spark Machine Learning Applications
  • Apache Spark SQL & Machine Learning on Genetic Variant Classifications
  • Data Visualization with Vegas Viz and Scala with Spark ML
  • Apache Spark Machine Learning with Dremio Data Lake Engine
  • Dremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine Learning
  • Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier
  • Setup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VM
  • Virus Xray Image Classification with Tensorflow Keras Python and Apache Spark Scala
  • Appendix -- Video Presentations
  • References
Powered by GitBook
On this page
  • Disclaimer:
  • Introduction
  • Data Preparation
  • Data Preprocessing
  • Apache Spark SQL API Image Read API Scala code to explore the image size
  • Scala code to resize the jpeg image
  • Algorithm Selection
  • Original Xray image
  • Resized Xray Image
  • Xray CNN image classification by Keras
  • Hardware Used:
  • Summary
  • Disclaimer again

Was this helpful?

Virus Xray Image Classification with Tensorflow Keras Python and Apache Spark Scala

PreviousSetup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VMNextAppendix -- Video Presentations

Last updated 4 years ago

Was this helpful?

Disclaimer:

This writing is exclusively and entirely for educational purpose in the field of computer science. Only government medical board-certified radiologist can and should perform diagnosis from an Xray image.

Introduction

Can average person tell the difference from a picture of cat or dog? Probably yes. Can average person tell the difference from looking at an Xray photo and tells the difference between normal or virus caused pneumonia? Not unless that person is a board-certified medical professional.

For educational purpose in computer science on machine learning, can a computer after it is trained by a given dataset (labeled Xray pictures) that are empirically true to differentiate an Xray photo and tells the difference between normal and virus caused pneumonia from Xray images? That needs to be found out.

Data Preparation

To begin with, I downloaded the Xray image dataset from Kaggle (Coronahack chest Xray dataset)

and build a neural network with Tensorflow Keras train the machine.

Generally, dataset to be used in image recognition is usually stored the following way, because the dataset is not a single file, with features and label, but many image files such as jpegs and a csv file telling the label and file name for each image file.

For image classification, common practice would be creating a folder, name the folder with label name, and place all the image files belong to that label inside that folder.

Therefore, I placed the files in below directory structure:

Train:
./
├── normal
└── virus
Validation:
./
├── normal
└── virus

Data Preprocessing

Apache Spark SQL API Image Read API Scala code to explore the image size

First, determine the image size by the following Scala code invoking Apache Spark Image read API:

val df = spark.read.format("image").option("dropInvalid", true).load("file:///home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/normal")
df.select("image.origin", "image.width", "image.height").show(3)

/* 
+--------------------+-----+------+
|              origin|width|height|
+--------------------+-----+------+
|file:///home/bigd...| 2619|  2628|
|file:///home/bigd...| 2510|  2543|
|file:///home/bigd...| 2633|  2578|
+--------------------+-----+------+
only showing top 3 rows
*/

Scala code to resize the jpeg image

The images are large, around 2500*2500, about 6 MP. This means, each pixel is a feature, or a column, this is like a table that has 6 million columns.

Therefore, I need to downsize to smaller image. I wrote the following Scala code to resize the image from about 2500*2500 to about 300*350, about one MP.

import java.awt.image.BufferedImage
import java.io.File
import javax.imageio.ImageIO

import javax.swing.ImageIcon;
import java.awt.Image;
import java.awt.Color;
import java.awt.Graphics2D;
import java.awt.RenderingHints;

//Get the Image path of the training image files, both normal and virus

val normal=new java.io.File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/normal/").listFiles
//val bacteria=new java.io.File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/bacteria/").listFiles
val virus=new java.io.File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/virus/").listFiles

/*
Write a helper function to resize each image file to desired width and height
and save the resize image file into desired path

*/

def resizeImage(image:Array[java.io.File],base:String,width:Int,height:Int):Unit=
{
//val width = 300
//val height = 350

for (filePath<-image){
// Load image from disk
var originalImage: BufferedImage = ImageIO.read(new File(filePath.toString))
//var originalImage: BufferedImage = ImageIO.read(new File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/normal/IM-0419-0001.jpeg"))
// Resize
var resized = originalImage.getScaledInstance(width, height, Image.SCALE_DEFAULT)

// saving image back to disk
var bufferedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB)
bufferedImage.getGraphics.drawImage(resized, 0, 0, null)
//println(base+filePath.toString.split("/").last)
ImageIO.write(bufferedImage, "JPEG", new File(base+filePath.toString.split("/").last))
}
}

//resize the train/normal images to 300*350 and saved into target path

resizeImage(normal,"/mnt/common/20200510/train/normal/",300,350)

//resize the train/virus images to 300*350 and saved into target path


//Get the Image path of the validation image files, both normal and virus


val normalV=new java.io.File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/validation/normal/").listFiles

val virusV=new java.io.File("/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/validation/virus/").listFiles

//resize the validation/normal images to 300*350 and saved into target path

resizeImage(normalV,"/mnt/common/20200510/validation/normal/",300,350)

//resize the validation/virus images to 300*350 and saved into target path

resizeImage(virusV,"/mnt/common/20200510/validation/virus/",300,350)

After resizing images to 300*350, the new location of the image files are in /mnt/common/20200510

./
├── train
│   ├── normal
│   └── virus
└── validation
    ├── normal
    └── virus

Algorithm Selection

Image classification is typically by convolutional neural network. I use Tensorflow/Keras. Now I need to switch language from Scala to Python to invoke Keras APIs.

Original Xray image

This is the example of the image before resizing:

from IPython.display import Image
Image(filename='/home/bigdata2/dataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/validation/normal/NORMAL2-IM-1423-0001.jpeg')

Resized Xray Image

This is the example of resized image that is label as normal

Image(filename='/mnt/common/20200510/validation/normal/NORMAL2-IM-1423-0001.jpeg')

This is the example of resized image that is labeled as pneumonia by virus

Image(filename='/mnt/common/20200510/validation/virus/person1609_virus_2791.jpeg')

Xray CNN image classification by Keras

Following is the code to train the machine to classify Xray Images whether normal or pneumonia by virus by convolutional neural network with Keras and Tensorflow on the background

import keras
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

#Create a convolutional neural network model

model = Sequential()
model.add(Conv2D(32, (2, 2), input_shape=(300, 350,3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(32, (2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(64, (2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())  # converts 3D feature maps to 1D feature vectors
model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))

# Once the model is created, config the model with losses and metrics with model.compile()

model.compile(loss='binary_crossentropy',
              optimizer='rmsprop',
              metrics=['accuracy'])

batch_size = 16

# augmentation configuration for training
train_datagen = ImageDataGenerator(
        rescale=1./255,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True)

train_generator = train_datagen.flow_from_directory(
        '/mnt/common/20200510/train/',  # this is the target directory
        target_size=(300, 350),  # all images will be resized to 150x150
        batch_size=batch_size,
        class_mode='binary')  # since we use binary_crossentropy loss, we need binary labels

# Found 2016 images belonging to 2 classes.

# generator for validation data
validation_generator = test_datagen.flow_from_directory(
        '/mnt/common/20200510/validation/',
        target_size=(300, 350),
        batch_size=batch_size,
        class_mode='binary')

# Found 670 images belonging to 2 classes.

# Fits the model on data yielded batch-by-batch by a Python generator
model.fit_generator(
        train_generator,
        steps_per_epoch=2000 // batch_size,
        epochs=50,
        validation_data=validation_generator,
        validation_steps=800 // batch_size)

Output below


Epoch 1/50
125/125 [==============================] - 37s 293ms/step - loss: 0.7213 - accuracy: 0.6635 - val_loss: 0.4492 - val_accuracy: 0.8208
Epoch 2/50
125/125 [==============================] - 36s 292ms/step - loss: 0.4976 - accuracy: 0.7735 - val_loss: 0.1604 - val_accuracy: 0.8659
...
Epoch 49/50
125/125 [==============================] - 35s 282ms/step - loss: 0.2251 - accuracy: 0.9320 - val_loss: 0.4890 - val_accuracy: 0.8885
Epoch 50/50
125/125 [==============================] - 36s 284ms/step - loss: 0.2067 - accuracy: 0.9340 - val_loss: 0.5270 - val_accuracy: 0.9261
<keras.callbacks.callbacks.History at 0x7f90917a6ef0>

Save model

#Always saving model weights

model.save_weights('/mnt/common/20200510/xray.h5')

Hardware Used:

By the way, the machine that runs this exercise is equipped with Intel 8700 8th gen CPU with 6 cores/12 threads, 64GB RAM and a nvidia GTX 1060 GPU with 6GB GPU memory. Both Tensorflow and Keras are GPU enabled version.

Summary

With not many lines of Python code and a few minutes of processing time, deep learning by CNN (Convolutional Neural Network) using Tensorflow/Keras yield training/validation accuracy of about 93%, which means, out of 100 Xray images, the machine tell whether normal or pneumonia by virus correctly on 93 images and wrong on 7 images.

Disclaimer again

This writing is exclusively and entirely for educational purpose in the field of computer science. Only government medical board-certified radiologist can and should perform diagnosis from an Xray image.

https://www.kaggle.com/praveengovi/coronahack-chest-xraydataset