📔
Data Science with Apache Spark
  • Preface
  • Contents
  • Basic Prerequisite Skills
  • Computer needed for this course
  • Spark Environment Setup
  • Dev environment setup, task list
  • JDK setup
  • Download and install Anaconda Python and create virtual environment with Python 3.6
  • Download and install Spark
  • Eclipse, the Scala IDE
  • Install findspark, add spylon-kernel for scala
  • ssh and scp client
  • Summary
  • Development environment on MacOS
  • Production Spark Environment Setup
  • VirtualBox VM
  • VirtualBox only shows 32bit on AMD CPU
  • Configure VirtualBox NAT as Network Adapter on Guest VM and Allow putty ssh Through Port Forwarding
  • Docker deployment of Spark Cluster
  • Create customized Apache Spark Docker container
  • Dockerfile
  • docker-compose and docker-compose.yml
  • Launch custom built Docker container with docker-compose
  • Entering Docker Container
  • Setup Hadoop, Hive and Spark on Linux without docker
  • Hadoop Preparation
  • Hadoop setup
  • Configure $HADOOP_HOME/etc/hadoop
  • HDFS
  • Start and stop Hadoop
  • Work with Hadoop and HDFS file system
  • Connect to Hadoop web interface port 50070 and 8088
  • Install Hive
  • hive home
  • Initialize hive schema
  • Start hive metastore service.
  • hive-site.xml
  • Hive client
  • Setup Apache Spark
  • Spark Home
  • Jupyter-notebook server
  • Python 3 Warm Up
  • Basics
  • Iterables/Collections
  • Strings
  • List
  • Tuple
  • Dictionary
  • Set
  • Conditional statement
  • for loop
  • while loop
  • Functions and methods
  • map and filter
  • map and filter takes function as input
  • lambda
  • Python Class
  • Input and if statement
  • Input from a file
  • Output to a file
  • try except
  • Python coding exercise
  • Scala Warm Up
  • Start Spylon-kernel on Jupyter-notebook
  • Type of Variable: Mutable or immutable
  • Block statement
  • Scala Data Type
  • Array in Scala
  • Methods
  • Functions
  • Anonymous function
  • Scala map and filter methods
  • Class
  • Objects
  • Trait
  • Tuple in Scala
  • List/Seq
  • Set in Scala
  • Scala Map
  • Scala if statement
  • Scala for loop
  • Scala While Loop
  • Scala Exceptions + try catch finally
  • Scala coding exercise
  • Run a program to estimate pi
  • Common Spark command line
  • Run Scala code with spark-submit
  • Python with Apache Spark using Jupyter notebook
  • Spark Core Introduction
  • Spark and Scala Version
  • Basic Spark Package
  • Resilient Distributed Datasets (RDDs)
  • RDD Operations
  • Passing Function to Spark
  • Printing elements of an RDD
  • Working with key value pair
  • RDD Transformation Functions
  • RDD Action Functions
  • SPARK SQL
  • SQL
  • Datasets and DataFrames
  • SparkSession
  • Creating DataFrames
  • Running SQL Queries Programmatically
  • Issue from running Cartesian Join Query
  • Creating Datasets
  • Interoperating with RDD
  • Untyped User-Defined Aggregate Functions
  • Generic Load/Save Functions
  • Manually specify file option
  • Run SQL on files directly
  • Save Mode
  • Saving to Persistent Tables
  • Bucketing, Sorting and Partitioning
  • Apache Arrow
  • Install Python Arrow Module PyArrow
  • Issue might happen import PyArrow
  • Enabling for Conversion to/from Pandas in Python
  • Connect to any data source the same consistent way
  • Spark SQL Implementation Example in Scala
  • Run scala code in Eclipse IDE
  • Hive Integration, run SQL or HiveQL queries on existing warehouses.
  • Example: Enrich JSON
  • Integrate Tableau Data Visualization with Hive Data Warehouse and Apache Spark SQL
  • Connect Tableau to Spark SQL running in VM with VirtualBox with NAT
  • Issues with connecting from Tableau to Spark SQL
  • SPARK Streaming
  • Discretized Streams (DStreams)
  • Transformations on DStreams
  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • union(otherStream)
  • reduce(func)
  • count()
  • countByValue()
  • reduceByKey(func, [numTasks])
  • join(otherStream, [numTasks])
  • cogroup(otherStream, [numTasks])
  • transform(func)
  • updateStateByKey(func)
  • Scala Tips for updateStateByKey
  • repartition(numPartitions)
  • DStream Window Operations
  • DStream Window Transformation
  • countByWindow(windowLength, slideInterval)
  • reduceByWindow(func, windowLength, slideInterval)
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])
  • window(windowLength, slideInterval)
  • Window DStream print(n)
  • saveAsTextFiles(prefix, [suffix])
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)
  • Build Twitter Scala API Library for Spark Streaming using sbt
  • Spark Streaming with Twitter, you can get public tweets by using Twitter API.
  • Spark streaming use case with Python
  • Spark Graph Computing
  • Spark Graph Computing Continue
  • Graphx
  • Package org.apache.spark.graphx
  • Edge Class
  • EdgeContext Class
  • EdgeDirection Class
  • EdgeRDD Class
  • EdgeTriplet Class
  • Graph Class
  • GraphLoader Object
  • GraphOps Class
  • GraphXUtils Object
  • PartitionStrategy Trait
  • Pregel Object
  • TripletFields Class
  • VertexRDD Class
  • Package org.apache.spark.graphx.impl
  • AggregatingEdgeContext Class
  • EdgeRDDImpl Class
  • Class GraphImpl<VD,ED>
  • Class VertexRDDImpl<VD>
  • Package org.apache.spark.graphx.lib
  • Class ConnectedComponents
  • Class LabelPropagation
  • Class PageRank
  • Class ShortestPaths
  • Class StronglyConnectedComponents
  • Class SVDPlusPlus
  • Class SVDPlusPlus.Conf
  • Class TriangleCount
  • Package org.apache.spark.graphx.util
  • Class BytecodeUtils
  • Class GraphGenerators
  • Graphx Example 1
  • Graphx Example 2
  • Graphx Example 3
  • Spark Graphx Describes Organization Chart Easy and Fast
  • Page Rank with Apache Spark Graphx
  • bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases
  • Tree and Graph Traversal with and without Spark Graphx
  • Graphx Graph Traversal with Pregel Explained
  • Spark Machine Learning
  • Binary Classification
  • Multiclass Classification
  • Regression
  • Correlation
  • Image Data Source
  • ML DataFrame is SQL DataFrame
  • ML Transformer
  • ML Estimator
  • ML Pipeline
  • Transformer/Estimator Parameters
  • Extracting, transforming and selecting features
  • TF-IDF
  • Word2Vec
  • FeatureHasher
  • Tokenizer
  • CountVectorizer
  • StopWordRemover
  • n-gram
  • Binarizer
  • PCA
  • PolynomialExpansion
  • StringIndexer
  • Discrete Cosine Transform (DCT)
  • One-hot encoding
  • StandardScaler
  • IndexToString
  • VectorIndexer
  • Interaction
  • Normalizer
  • MinMaxScaler
  • MaxAbScaler
  • Bucketizer
  • ElementwiseProduct
  • SQLTransformer
  • VectorAssembler
  • VectorSizeHint
  • QuantileDiscretizer
  • Imputer
  • VectorSlicer
  • RFormula
  • ChiSqSelector
  • Locality Sensitive Hashing
  • MinHash for Jaccard Distance
  • Classification and Regression
  • LogisticRegression
  • OneVsRest
  • Naive Bayes classifiers
  • Decision trees
  • Random forests
  • Gradient-boosted trees (GBTs)
  • Multilayer perceptron classifier
  • Linear Support Vector Machine
  • Linear Regression
  • Generalized linear regression
  • Isotonic regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted tree regression
  • Survival regression
  • Clustering
  • k-means
  • Latent Dirichlet allocation or LDA
  • Bisecting k-means
  • A Gaussian Mixture Model
  • Collaborative filtering
  • Frequent Pattern Mining
  • FP-Growth
  • PrefixSpan
  • ML Tuning: model selection and hyperparameter tuning
  • Model selection (a.k.a. hyperparameter tuning)
  • Cross-Validation
  • Train-Validation Split
  • Spark Machine Learning Applications
  • Apache Spark SQL & Machine Learning on Genetic Variant Classifications
  • Data Visualization with Vegas Viz and Scala with Spark ML
  • Apache Spark Machine Learning with Dremio Data Lake Engine
  • Dremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine Learning
  • Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier
  • Setup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VM
  • Virus Xray Image Classification with Tensorflow Keras Python and Apache Spark Scala
  • Appendix -- Video Presentations
  • References
Powered by GitBook
On this page
  • Apache Arrow:
  • Apache Arrow Flight
  • Dremio
  • Dremio Flight Connector
  • Setup:
  • Apache Spark as client of Dremio server:
  • Summary:

Was this helpful?

Dremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine Learning

PreviousApache Spark Machine Learning with Dremio Data Lake EngineNextNeural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier

Last updated 5 years ago

Was this helpful?

There are following components in the writing:

Apache Arrow

Apache Arrow Flight

Dremio server

Dremio Flight Connector

Apache Spark Machine Learning.

Let’s itemize all components in the writing:

Apache Arrow:

Apache Arrow is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. It also provides computational libraries and zero-copy streaming messaging and interprocess communication. Languages currently supported include C, C++, C#, Go, Java, JavaScript, MATLAB, Python, R, Ruby, and Rust.

Apache Arrow is columnar in memory data structure, allows applications to avoid unnecessary IO and accelerate analytical processing performance on modern CPUs and GPUs.

Conventionally, this is how to share data across different systems:

With obvious disadvantages:

Each system has its own internal memory format 70–80% computation wasted on serialization and deserialization

Similar functionality implemented in multiple projects

With Apache Arrow as common data structures:

With obvious advantages when it comes to data transport amongst different systems that use Arrow memory data structure framework:

All systems utilize the same memory format No overhead for cross-system communication Projects can share functionality (eg, Parquet-to-Arrow reader)

Apache Arrow Flight

A Framework for Fast Data Transport Flight initially is focused on optimized transport of the Arrow columnar format (i.e. "Arrow record batches") over gRPC, Google’s popular HTTP/2-based general-purpose RPC library and framework. While we have focused on integration with gRPC, as a development framework Flight is not intended to be exclusive to gRPC.

One of the biggest features that sets apart Flight from other data transport frameworks is parallel transfers, allowing data to be streamed to or from a cluster of servers simultaneously. This enables developers to more easily create scalable data services that can serve a growing client base.

A simple Flight setup might consist of a single server to which clients connect and make DoGet requests.

Flight Basics

The Arrow Flight libraries provide a development framework for implementing a service that can send and receive data streams. A Flight server supports several basic kinds of requests:

Handshake: a simple request to determine whether the client is authorized and, in some cases, to establish an implementation-defined session token to use for future requests
ListFlights: return a list of available data streams 
GetSchema: return the schema for a data stream 
GetFlightInfo: return an "access plan" for a dataset of interest, possibly requiring consuming multiple data streams. This request can accept custom serialized commands containing, for example, your specific application parameters.
DoGet: send a data stream to a client 
DoPut: receive a data stream from a client 
DoAction: perform an implementation-specific action and return any results, i.e. a generalized function call 
ListActions: return a list of available action types Also take advantage of gRPC’s elegant "bidirectional" streaming support (built on top of HTTP/2 streaming) to allow clients and servers to send data and metadata to each other simultaneously while requests are being served.

Dremio

Dremio is the data lake engine

www.dremio.com

Dremio’s query engine is built on Apache Arrow, which is an in memory columnar data structure. Its SQL engine allows you to use SQL to query structured data such as relational database tables or non-structure such as key value pairs entities such as JSON, it is a distributed/clustered and in memory columnar query engine, that can run on one node or many nodes.

Dremio Flight Connector

Dremio Flight Connector is an implementation of Apache Arrow Flight Framework that allows a client, such as a Java program or Python script to request data from Dremio server using Apache Arrow Flight protocol, that inherits the data transport Apache Arrow data structure.

This means you can get data from Dremio or any systems that use Apache Arrow as memory data cache without ODBC or JDBC and without serialization and deserialization overhead that come with both.

Below is the integration demonstration of Dremio Data Lake Engine Server, Dremio Flight Connector and Apache Machine Learning Spam Detection.

Setup:

In my last writing, I have built Dremio server open source version. I am going to leverage Dremio server already built for this writing.

First task, I need to build Dremio Flight Connector that allows getting data from Dremio via Apache Flight, which is Arrow Memory Data Transport Protocol.

(1). To start, git clone Dremio Flight Connector from GitHub:

(2). Then follow build instruction in the readme, make sure your mvn version is up to date and simply run: mvn clean install or if you do not have current or recent version of mvn: mvnw clean install

(3). Once build complete, get the jar file in the target folder inside the folder created by git, such as:

dremio-flight-connector-0.11.0-SNAPSHOT-shaded.jar

and copy it to <dremio home dir>/jars/

(4). Then modify /conf/dremio-env

DREMIO_JAVA_SERVER_EXTRA_OPTS=’-Ddremio.flight.enabled=true -Ddremio.flight.parallel.enabled=true -Ddremio.flight.use-ssl=true -Ddremio.flight.port=47470 -Ddremio.flight.host=0.0.0.0'

(5). Restart dremio, assume you are in <dremio home dir>,

bin/dremio — config ./conf start

(6). To make sure Dremio flight connector is up and running:

(i) Check java process to make sure “-Ddremio.flight.enabled=true” is inside the dremio command line

ps -ef | grep dremio | grep java | grep -v grep 

dremio 5588 1 2 11:46 pts/1 00:05:12 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12–1.el7_6.x86_64/jre/bin/java -Djava.util.logging.config.class=org.slf4j.bridge.SLF4JBridgeHandler -Djava.library.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/lib -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log/server.gc -Ddremio.log.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log -Ddremio.plugins.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/plugins -Xmx4096m -XX:MaxDirectMemorySize=8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log -Dio.netty.maxDirectMemory=0 -DMAPR_IMPALA_RA_THROTTLE -DMAPR_MAX_RA_STREAMS=400 -Ddremio.flight.enabled=true -Ddremio.flight.parallel.enabled=true -Ddremio.flight.use-ssl=true -Ddremio.flight.port=47470 -Ddremio.flight.host=0.0.0.0 … com.dremio.dac.daemon.DremioDaemon

(ii) Check dremio server.log file, /log/server.log, look for below:

2020–03–21 17:16:39,835 [main] INFO com.dremio.flight.FlightInitializer — set up flight plugin on port 47470 and host 0.0.0.0

(iii) Lastly, to make sure port 47470 (flight connector) is listened by Dremio, by telnet:

telnet 10.0.2.15 47470 

Trying 10.0.2.15… 
Connected to 10.0.2.15. 
Escape character is '^]'.

Apache Spark as client of Dremio server:

When all are done, you are in business to continue your data science work.

The dataset used in this writing is SMSSpamCollection, which text messaging data, which has labeled whether a SMS text is a Spam (a Spam) or Ham (not a Spam).

Following is from the provider of this dataset as description:

The SMS Spam Collection v.1 (hereafter the corpus) is a set of SMS tagged messages that have been collected for SMS Spam research. It contains one set of SMS messages in English of 5,574 messages, tagged according being ham (legitimate) or spam.

First, I placed the data file SMSSpamCollection into HDFS folder /dremio

hdfs dfs -ls /dremio/

Found 3 items 
-rw-r — r — 3 hadoop supergroup 477907 2020–03–21 11:07 /dremio/SMSSpamCollection 

The file is tab (\t) delimited.

In dremio UI, load up and catalog this file as data source so this file can be queried by SQL, as if it were a table.

Once dataset file can be loaded by Dremio, which can now supply to client, such as Python script with Dremio Flight Connector, without ODBC, following is the Python code does the following:

Import pyarrow and flight library

Connect to Dremio via Arrow flight connector, including authentication

Fetching data from SMSSpamCollection file SQL and loading to a Pandas dataframe

Starts Apache Spark and convert the Pandas dataframe to SparkSQL dataframe

Conduct machine learning model traing and testing with Apache Spark ML

LogisticRegression and NeiveBayes

Following is the Python code that connects to Dremio with Dremio Flight Connector and does ML with Apache Spark:

#Import needed Apache Arrow Python library for flight
from pyarrow import flight
import pyarrow as pa
#Define Dremio Client Authentication Methods
class HttpDremioClientAuthHandler(flight.ClientAuthHandler):
def __init__(self, username, password):
super(flight.ClientAuthHandler, self).__init__()
self.basic_auth = flight.BasicAuth(username, password)
self.token = None
def authenticate(self, outgoing, incoming):
auth = self.basic_auth.serialize()
outgoing.write(auth)
self.token = incoming.read()
def get_token(self):
return self.token
username = ‘george’
password = ‘<redacted>’
sql = ‘’’select * from “fraud.dremio”.SMSSpamCollection’’’
#Connect to Dremio with flight connector on port 47470 mentions earlier in the writing
client = flight.FlightClient(‘grpc+tcp://10.0.2.15:47470’)
client.authenticate(HttpDremioClientAuthHandler(username, password))
#passing in SQL query statement to Dremio, execute and returns the data in pandas
#dataframe pdf
info = client.get_flight_info(flight.FlightDescriptor.for_command(sql))
reader = client.do_get(info.endpoints[0].ticket)
batches = []
import pandas as pd
while True:
try:
batch, metadata = reader.read_chunk()
batches.append(batch)
except StopIteration:
break
data = pa.Table.from_batches(batches)
pdf = data.to_pandas()
pdf.head()
A B
0 ham Go until jurong point, crazy.. Available only …
1 ham Ok lar… Joking wif u oni…
2 spam Free entry in 2 a wkly comp to win FA Cup fina…
3 ham U dun say so early hor… U c already then say…
4 ham Nah I don’t think he goes to usf, he lives aro…
import sys,os,os.path
os.environ[‘SPARK_HOME’]=’/opt/spark/’
import matplotlib.pyplot as plt
%matplotlib inline
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.getOrCreate()
sc=spark.sparkContext
#Convert the pandas dataframe to SparkSQL dataframe
sqlCtx = SQLContext(sc)
df = sqlCtx.createDataFrame(pdf)
df.show(5)

/*
Output:
+ — — + — — — — — — — — — — +
| A   | B                   |
+ — — + — — — — — — — — — — +
| ham|Go until jurong p…    |
| ham|Ok lar… Joking …      |
|spam|Free entry in 2 a…    |
| ham|U dun say so earl…    |
| ham|Nah I don’t think…    |
+ — — + — — — — — — — — — — +
only showing top 5 rows

*/

#Note: Spam is Spam, Ham is OK. Rename Column name A as status, B as feature
df = df.withColumnRenamed('A', 'status').withColumnRenamed('B', 'message')

#Encode status column to numeric: ham to 1.0 and spam to 0.
#All our fields need to be numeric for machine to learn, also rename the column #status to label

df.createOrReplaceTempView('temp')
df = spark.sql('select case status when "ham" then 1.0 else 0 end as label, message from temp')

df.show()

/*
Output:
+-----+--------------------+
|label|             message|
+-----+--------------------+
|  1.0|Go until jurong p...|
|  1.0|Ok lar... Joking ...|
|  0.0|Free entry in 2 a...|
|  1.0|U dun say so earl...|
|  1.0|Nah I don't think...|
|  0.0|FreeMsg Hey there...|
|  1.0|Even my brother i...|
|  1.0|As per your reque...|
|  0.0|WINNER!! As a val...|
|  0.0|Had your mobile 1...|
|  1.0|I'm gonna be home...|
|  0.0|SIX chances to wi...|
|  0.0|URGENT! You have ...|
|  1.0|I've been searchi...|
|  1.0|I HAVE A DATE ON ...|
|  0.0|XXXMobileMovieClu...|
|  1.0|Oh k...i'm watchi...|
|  1.0|Eh u remember how...|
|  1.0|Fine if that’s th...|
|  0.0|England v Macedon...|
+-----+--------------------+
only showing top 20 rows

*/

#1 is OK, 0 is Spam
#Tokenize the messages Tokenization is the process of taking text (such as a sentence)
# and breaking it into individual terms (usually words). Let’s tokenize the messages
#and create a list of words of each message.
from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show(3)

/*
Output:

+-----+--------------------+--------------------+
|label|             message|               words|
+-----+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|
+-----+--------------------+--------------------+
only showing top 3 rows

*/

#CountVectorizer converts a collection of text documents to vectors of token counts.
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show(3)

/*
Output:

+-----+--------------------+--------------------+--------------------+
|label|             message|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|(13587,[8,43,53,6...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|(13587,[5,76,409,...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|(13587,[0,3,8,22,...|
+-----+--------------------+--------------------+--------------------+
only showing top 3 rows
#IDF reduces the features that often appear in the corpus. When using text as a feature,
#this usually improves performance because the most common,
#and therefore less important, words are weighted down

*/

from pyspark.ml.feature import IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show(3)  #Only needed to train

/*
Output:

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,43,53,6...|
|  1.0|(13587,[5,76,409,...|
|  0.0|(13587,[0,3,8,22,...|
+-----+--------------------+

only showing top 3 rows
*/

#Randomly Split DataFrame into 80% Training (trainDF) and 20 Testing (testDF)
seed = 0  # random seed 0
trainDF, testDF = rescaledData.randomSplit([0.8,0.2],seed)

#Logistic regression classifier
#Logistic regression is a common method of predicting classification responses.
#A special case of a generalized linear model is the probability of predicting a #result.
#In spark.ml, logistic regression can be used to predict binary results
#by binomial logistic regression, or it can be used to predict multiple types of #results by using multiple logistic regression.
#Use the family parameter to choose between these two algorithms, or leave it unset #and Spark
#will infer the correct variable.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np

lr = LogisticRegression(maxIter = 100)
model_lr = lr.fit(trainDF)
prediction_lr = model_lr.transform(testDF)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_lr.evaluate(prediction_lr)
/*
0.8734030197444833
*/

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(prediction_lr)

/*
0.9654997463216642
*/

my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(prediction_lr)

/*
0.967479674796748
*/

train_fit_lr = prediction_lr.select('label','prediction')
train_fit_lr.groupBy('label','prediction').count().show()

/*
Output:

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|   31|
|  1.0|       1.0|  860|
|  0.0|       0.0|   92|
|  1.0|       0.0|    1|
+-----+----------+-----+
*/

#Naive Bayes Naive Bayesian classifiers are a class of simple probability classifiers
#that apply strong (naive)
#independent assumptions between features based on Bayes' theorem.
#The spark.ml implementation
#currently supports polynomial naive Bayes and Bernoulli Naïve Bayes.

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
Model_nb = nb.fit(trainDF)
predictions_nb = Model_nb.transform(testDF)

predictions_nb.select(‘label’, ‘prediction’).show(5)

/*
Output:

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows
*/

from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_nb.evaluate(predictions_nb)

/*
0.937862950058072
*/
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_nb.evaluate(predictions_nb)
/*
0.933544453535483
*/
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_nb.evaluate(predictions_nb)
/*
0.9278455284552846
*/

Summary:

Following is the model metrics:

In this exercise, LogisticRegression appears similar to NaiveBayes. The objective of this writing is to demonstrate how to use Dremio Flight Connector to get data from Dremio server with Apache Arrow Flight protocol without ODBC or JDBC to run application with Apache Spark such as Machine Learning with Spark.

As always, code used in this writing is in my GitHub repo.

Apache Spark Machine Learning with Dremio Data Lake Engine
LogoGitHub - dremio-hub/dremio-flight-connector: Dremio Flight connector. Access Dremio using Arrow flightGitHub
LogoGitHub - geyungjen/jentekllc: Apache Spark Application Development -- George Jen, Jen Tek LLCGitHub