📔
Data Science with Apache Spark
  • Preface
  • Contents
  • Basic Prerequisite Skills
  • Computer needed for this course
  • Spark Environment Setup
  • Dev environment setup, task list
  • JDK setup
  • Download and install Anaconda Python and create virtual environment with Python 3.6
  • Download and install Spark
  • Eclipse, the Scala IDE
  • Install findspark, add spylon-kernel for scala
  • ssh and scp client
  • Summary
  • Development environment on MacOS
  • Production Spark Environment Setup
  • VirtualBox VM
  • VirtualBox only shows 32bit on AMD CPU
  • Configure VirtualBox NAT as Network Adapter on Guest VM and Allow putty ssh Through Port Forwarding
  • Docker deployment of Spark Cluster
  • Create customized Apache Spark Docker container
  • Dockerfile
  • docker-compose and docker-compose.yml
  • Launch custom built Docker container with docker-compose
  • Entering Docker Container
  • Setup Hadoop, Hive and Spark on Linux without docker
  • Hadoop Preparation
  • Hadoop setup
  • Configure $HADOOP_HOME/etc/hadoop
  • HDFS
  • Start and stop Hadoop
  • Work with Hadoop and HDFS file system
  • Connect to Hadoop web interface port 50070 and 8088
  • Install Hive
  • hive home
  • Initialize hive schema
  • Start hive metastore service.
  • hive-site.xml
  • Hive client
  • Setup Apache Spark
  • Spark Home
  • Jupyter-notebook server
  • Python 3 Warm Up
  • Basics
  • Iterables/Collections
  • Strings
  • List
  • Tuple
  • Dictionary
  • Set
  • Conditional statement
  • for loop
  • while loop
  • Functions and methods
  • map and filter
  • map and filter takes function as input
  • lambda
  • Python Class
  • Input and if statement
  • Input from a file
  • Output to a file
  • try except
  • Python coding exercise
  • Scala Warm Up
  • Start Spylon-kernel on Jupyter-notebook
  • Type of Variable: Mutable or immutable
  • Block statement
  • Scala Data Type
  • Array in Scala
  • Methods
  • Functions
  • Anonymous function
  • Scala map and filter methods
  • Class
  • Objects
  • Trait
  • Tuple in Scala
  • List/Seq
  • Set in Scala
  • Scala Map
  • Scala if statement
  • Scala for loop
  • Scala While Loop
  • Scala Exceptions + try catch finally
  • Scala coding exercise
  • Run a program to estimate pi
  • Common Spark command line
  • Run Scala code with spark-submit
  • Python with Apache Spark using Jupyter notebook
  • Spark Core Introduction
  • Spark and Scala Version
  • Basic Spark Package
  • Resilient Distributed Datasets (RDDs)
  • RDD Operations
  • Passing Function to Spark
  • Printing elements of an RDD
  • Working with key value pair
  • RDD Transformation Functions
  • RDD Action Functions
  • SPARK SQL
  • SQL
  • Datasets and DataFrames
  • SparkSession
  • Creating DataFrames
  • Running SQL Queries Programmatically
  • Issue from running Cartesian Join Query
  • Creating Datasets
  • Interoperating with RDD
  • Untyped User-Defined Aggregate Functions
  • Generic Load/Save Functions
  • Manually specify file option
  • Run SQL on files directly
  • Save Mode
  • Saving to Persistent Tables
  • Bucketing, Sorting and Partitioning
  • Apache Arrow
  • Install Python Arrow Module PyArrow
  • Issue might happen import PyArrow
  • Enabling for Conversion to/from Pandas in Python
  • Connect to any data source the same consistent way
  • Spark SQL Implementation Example in Scala
  • Run scala code in Eclipse IDE
  • Hive Integration, run SQL or HiveQL queries on existing warehouses.
  • Example: Enrich JSON
  • Integrate Tableau Data Visualization with Hive Data Warehouse and Apache Spark SQL
  • Connect Tableau to Spark SQL running in VM with VirtualBox with NAT
  • Issues with connecting from Tableau to Spark SQL
  • SPARK Streaming
  • Discretized Streams (DStreams)
  • Transformations on DStreams
  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • union(otherStream)
  • reduce(func)
  • count()
  • countByValue()
  • reduceByKey(func, [numTasks])
  • join(otherStream, [numTasks])
  • cogroup(otherStream, [numTasks])
  • transform(func)
  • updateStateByKey(func)
  • Scala Tips for updateStateByKey
  • repartition(numPartitions)
  • DStream Window Operations
  • DStream Window Transformation
  • countByWindow(windowLength, slideInterval)
  • reduceByWindow(func, windowLength, slideInterval)
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])
  • window(windowLength, slideInterval)
  • Window DStream print(n)
  • saveAsTextFiles(prefix, [suffix])
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)
  • Build Twitter Scala API Library for Spark Streaming using sbt
  • Spark Streaming with Twitter, you can get public tweets by using Twitter API.
  • Spark streaming use case with Python
  • Spark Graph Computing
  • Spark Graph Computing Continue
  • Graphx
  • Package org.apache.spark.graphx
  • Edge Class
  • EdgeContext Class
  • EdgeDirection Class
  • EdgeRDD Class
  • EdgeTriplet Class
  • Graph Class
  • GraphLoader Object
  • GraphOps Class
  • GraphXUtils Object
  • PartitionStrategy Trait
  • Pregel Object
  • TripletFields Class
  • VertexRDD Class
  • Package org.apache.spark.graphx.impl
  • AggregatingEdgeContext Class
  • EdgeRDDImpl Class
  • Class GraphImpl<VD,ED>
  • Class VertexRDDImpl<VD>
  • Package org.apache.spark.graphx.lib
  • Class ConnectedComponents
  • Class LabelPropagation
  • Class PageRank
  • Class ShortestPaths
  • Class StronglyConnectedComponents
  • Class SVDPlusPlus
  • Class SVDPlusPlus.Conf
  • Class TriangleCount
  • Package org.apache.spark.graphx.util
  • Class BytecodeUtils
  • Class GraphGenerators
  • Graphx Example 1
  • Graphx Example 2
  • Graphx Example 3
  • Spark Graphx Describes Organization Chart Easy and Fast
  • Page Rank with Apache Spark Graphx
  • bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases
  • Tree and Graph Traversal with and without Spark Graphx
  • Graphx Graph Traversal with Pregel Explained
  • Spark Machine Learning
  • Binary Classification
  • Multiclass Classification
  • Regression
  • Correlation
  • Image Data Source
  • ML DataFrame is SQL DataFrame
  • ML Transformer
  • ML Estimator
  • ML Pipeline
  • Transformer/Estimator Parameters
  • Extracting, transforming and selecting features
  • TF-IDF
  • Word2Vec
  • FeatureHasher
  • Tokenizer
  • CountVectorizer
  • StopWordRemover
  • n-gram
  • Binarizer
  • PCA
  • PolynomialExpansion
  • StringIndexer
  • Discrete Cosine Transform (DCT)
  • One-hot encoding
  • StandardScaler
  • IndexToString
  • VectorIndexer
  • Interaction
  • Normalizer
  • MinMaxScaler
  • MaxAbScaler
  • Bucketizer
  • ElementwiseProduct
  • SQLTransformer
  • VectorAssembler
  • VectorSizeHint
  • QuantileDiscretizer
  • Imputer
  • VectorSlicer
  • RFormula
  • ChiSqSelector
  • Locality Sensitive Hashing
  • MinHash for Jaccard Distance
  • Classification and Regression
  • LogisticRegression
  • OneVsRest
  • Naive Bayes classifiers
  • Decision trees
  • Random forests
  • Gradient-boosted trees (GBTs)
  • Multilayer perceptron classifier
  • Linear Support Vector Machine
  • Linear Regression
  • Generalized linear regression
  • Isotonic regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted tree regression
  • Survival regression
  • Clustering
  • k-means
  • Latent Dirichlet allocation or LDA
  • Bisecting k-means
  • A Gaussian Mixture Model
  • Collaborative filtering
  • Frequent Pattern Mining
  • FP-Growth
  • PrefixSpan
  • ML Tuning: model selection and hyperparameter tuning
  • Model selection (a.k.a. hyperparameter tuning)
  • Cross-Validation
  • Train-Validation Split
  • Spark Machine Learning Applications
  • Apache Spark SQL & Machine Learning on Genetic Variant Classifications
  • Data Visualization with Vegas Viz and Scala with Spark ML
  • Apache Spark Machine Learning with Dremio Data Lake Engine
  • Dremio Data Lake Engine Apache Arrow Flight Connector with Spark Machine Learning
  • Neural Network with Apache Spark Machine Learning Multilayer Perceptron Classifier
  • Setup TensorFlow, Keras, Theano, Pytorch/torchvision on the CentOS VM
  • Virus Xray Image Classification with Tensorflow Keras Python and Apache Spark Scala
  • Appendix -- Video Presentations
  • References
Powered by GitBook
On this page

Was this helpful?

bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases

PreviousPage Rank with Apache Spark GraphxNextTree and Graph Traversal with and without Spark Graphx

Last updated 5 years ago

Was this helpful?

What is Bulk Synchronous Parallel?

The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.

BSP computer

BSP computer consists of

components capable of processing and/or local memory transactions (i.e., processors)

a network that routes messages between pairs of such components

a hardware facility that allows for the synchronization of all or a subset of components.

Pregel from Google

There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.

In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:

Concurrent computing

Each calculation applying to vertices concurrently, up to the available CPU cores in each computer

Communication

Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step

Synchronization.

Vertices can decide when to cease sending messages

Iteration stops when no message to be sent.

Google’s Pregel implementation in Apache Spark Graphx

Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.

Method pregel is defined in abstract class Graph

abstract class Graph[VD: ClassTag, ED: ClassTag] {

def pregel[A: ClassTag]
(initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)
(
    vprog: (VertexId, VD, A) => VD,
    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
    mergeMsg: (A, A) => A
)    
: Graph[VD, ED]
}

pregel API is implemented in class GraphOps

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
  /**
   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
   * user-defined vertex-program `vprog` is executed in parallel on
   * each vertex receiving any inbound messages and computing a new
   * value for the vertex.  The `sendMsg` function is then invoked on
   * all out-edges and is used to compute an optional message to the
   * destination vertex. The `mergeMsg` function is a commutative
   * associative function used to combine messages destined to the
   * same vertex.
   *
   * On the first iteration all vertices receive the `initialMsg` and
   * on subsequent iterations if a vertex does not receive a message
   * then the vertex-program is not invoked.
   *
   * This function iterates until there are no remaining messages, or
   * for `maxIterations` iterations.
   *
   * @tparam A the Pregel message type
   *
   * @param initialMsg the message each vertex will receive at the on
   * the first iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
}

Detail implementation is in Pregel object:

/**
 * Implements a Pregel-like bulk-synchronous message-passing API.
 *
 * Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
 * edges, enables the message sending computation to read both vertex attributes, and constrains
 * messages to the graph structure.  These changes allow for substantially more efficient
 * distributed execution while also exposing greater flexibility for graph-based computation.
 *
 * @example We can use the Pregel abstraction to implement PageRank:
 * {{{
 * val pagerankGraph: Graph[Double, Double] = graph
 *   // Associate the degree with each vertex
 *   .outerJoinVertices(graph.outDegrees) {
 *     (vid, vdata, deg) => deg.getOrElse(0)
 *   }
 *   // Set the weight on the edges based on the degree
 *   .mapTriplets(e => 1.0 / e.srcAttr)
 *   // Set the vertex attributes to the initial pagerank values
 *   .mapVertices((id, attr) => 1.0)
 *
 * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
 *   resetProb + (1.0 - resetProb) * msgSum
 * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
 *   Iterator((edge.dstId, edge.srcAttr * edge.attr))
 * def messageCombiner(a: Double, b: Double): Double = a + b
 * val initialMessage = 0.0
 * // Execute Pregel for a fixed number of iterations.
 * Pregel(pagerankGraph, initialMessage, numIter)(
 *   vertexProgram, sendMessage, messageCombiner)
 * }}}
 *
 */
object Pregel extends Logging {

  /**
   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
   * user-defined vertex-program `vprog` is executed in parallel on
   * each vertex receiving any inbound messages and computing a new
   * value for the vertex.  The `sendMsg` function is then invoked on
   * all out-edges and is used to compute an optional message to the
   * destination vertex. The `mergeMsg` function is a commutative
   * associative function used to combine messages destined to the
   * same vertex.
   *
   * On the first iteration all vertices receive the `initialMsg` and
   * on subsequent iterations if a vertex does not receive a message
   * then the vertex-program is not invoked.
   *
   * This function iterates until there are no remaining messages, or
   * for `maxIterations` iterations.
   *
   * @tparam VD the vertex data type
   * @tparam ED the edge data type
   * @tparam A the Pregel message type
   *
   * @param graph the input graph.
   *
   * @param initialMsg the message each vertex will receive at the first
   * iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run. The default is
   * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
   * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
   * *both* vertices received a message.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
     (graph: Graph[VD, ED],
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] =
  {
    require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
      s" but got ${maxIterations}")

    val checkpointInterval = graph.vertices.sparkContext.getConf
      .getInt("spark.graphx.pregel.checkpointInterval", -1)
    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
    val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
      checkpointInterval, graph.vertices.sparkContext)
    graphCheckpointer.update(g)

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
      checkpointInterval, graph.vertices.sparkContext)
    messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
    var activeMessages = messages.count()

    // Loop
    var prevG: Graph[VD, ED] = null
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      prevG = g
      g = g.joinVertices(messages)(vprog)
      graphCheckpointer.update(g)

      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
      // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
      // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
      // and the vertices of g).
      messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
      activeMessages = messages.count()

      logInfo("Pregel finished iteration " + i)

      // Unpersist the RDDs hidden by newly-materialized RDDs
      oldMessages.unpersist()
      prevG.unpersistVertices()
      prevG.edges.unpersist()
      // count the iteration
      i += 1
    }
    messageCheckpointer.unpersistDataSet()
    graphCheckpointer.deleteAllCheckpoints()
    messageCheckpointer.deleteAllCheckpoints()
    g
  } // end of apply

} // end of class Pregel


Super Step implementation is this while loop block, iteration will stop either no more messages to be sent or iteration counter reaches the maximum iteration limit.
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      prevG = g
      g = g.joinVertices(messages)(vprog)
      graphCheckpointer.update(g)

      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
      // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
      // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
      // and the vertices of g).
      messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
      activeMessages = messages.count()

      logInfo("Pregel finished iteration " + i)

      // Unpersist the RDDs hidden by newly-materialized RDDs
      oldMessages.unpersist()
      prevG.unpersistVertices()
      prevG.edges.unpersist()
      // count the iteration
      i += 1
    }

Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx

While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.

Based on pregel definition in class GraphOps

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
]
}

Arguments needed for invoking Graphx pregel API

As long as all arguments to the pregel API method are provided. What are arguments?

First you need to provide value arguments:

initialMsg: A, 
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue 
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg

Second you need to define and provide 3 functional arguments.

vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step

sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)

mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating vertex attribute.

Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.

Example: Populate vertices in graph with maximum attribute of a vertex

Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.

For example: Given following Graph that has 4 vertices, with attributes inside the circles

The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.

How it works:

In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).

In each subsequent super step:

Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)

Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message

Iteration exits once all vertices stop sending message.

Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.

Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.

Create a 10 vertices random graph:

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
graph.vertices.collect.foreach(println)
/*
You notice largest attribute of a vertex is 9.0
(4,4.0)
(0,0.0)
(6,6.0)
(8,8.0)
(2,2.0)
(1,1.0)
(3,3.0)
(7,7.0)
(9,9.0)
(5,5.0)

*/

Prepare arguments to invoke Google pregel API in Graphx

//define 3 functions as arguments of method pregel
val vProg = { (id: VertexId, attr: Double, msg: Double) => math.max(attr,msg) }

val sendMsg = { (triplet: EdgeTriplet[Double, Int]) =>
       if (triplet.srcAttr > triplet.dstAttr) {
              Iterator((triplet.dstId, triplet.srcAttr))
        } else {
              Iterator.empty
    }
}

val reduceMsg = { (a: Double, b: Double) => math.max(a,b) }

Here are required arguments, I use them as template every time I need prepare to invike Pregel API :

initialMsg: A, 
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue 
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg
vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating it attribute.

In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.

Invoke pregel API

graph.pregel(Double.NegativeInfinity)(
vProg,
sendMsg,
reduceMsg
).vertices.collect.foreach(println)

/*
Output: Populate every Vertex attribute with largest attribute of a Vertex, which is 9.0 in the example

(4,9.0)
(0,9.0)
(6,9.0)
(8,9.0)
(2,9.0)
(1,9.0)
(3,9.0)
(7,9.0)
(9,9.0)
(5,9.0)


*/

Following code worked the same with anonymous functions.

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
graph.vertices.collect.foreach(println)
/*
(4,4.0)
(0,0.0)
(6,6.0)
(8,8.0)
(2,2.0)
(1,1.0)
(3,3.0)
(7,7.0)
(9,9.0)
(5,5.0)

*/

val maxGraph=graph.pregel(Double.NegativeInfinity)(
(id, dist, newDist) => math.max(dist, newDist),
triplet => {  // Send Message
    if (triplet.srcAttr > triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.max(a, b) // Merge Message

)
maxGraph.vertices.collect.foreach(println)
/*
(4,9.0)
(0,9.0)
(6,9.0)
(8,9.0)
(2,9.0)
(1,9.0)
(3,9.0)
(7,9.0)
(9,9.0)
(5,9.0)

*/

Example: Trucking Company

Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.

As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.

Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.

Pregel Implementation:

Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:

VertexId, Seq(k,v)

Attribute of vertex is a sequence of key value (k,v) pair

k=destination vertexId

v=number of miles to the destination vertex

Define helper methods:

Create initial empty Map for key value pair

def createMap(x: (VertexId, Int)*) = Map(x: _*)

Add distance for the next edge to the destination vertex

def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }

Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail

def addRoute(busmap1: busMap, busmap2: busMap): busMap =
    (busmap1.keySet ++ busmap2.keySet).map {
      k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap

Define initialMsg required by Pregel

//Define initialMessage needed by Pregel as argument
    val initialMessage = createMap()

Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps

    def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
      addRoute(attr, msg)
    }

Define sendMsg as needed by Pregel as functional argument

def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
        val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
      if (edge.srcAttr != addRoute(newAttr, edge.srcAttr)) 
           Iterator((edge.srcId, newAttr))
      else 
// Vertices can decide not to send message with empty iterator
           Iterator.empty

Following is the code:

import org.apache.spark.graphx._
import scala.reflect.ClassTag
import org.apache.spark.graphx.impl

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._


//define Map as data type
type busMap = Map[VertexId, Int]

def createMap(x: (VertexId, Int)*) = Map(x: _*)

def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }

def addRoute(busmap1: busMap, busmap2: busMap): busMap =
    (busmap1.keySet ++ busmap2.keySet).map {
      k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap

import scala.reflect.ClassTag
import org.apache.spark.graphx.impl

def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Graph[busMap, ED] = {
    val sssp = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) 
            createMap(vid -> 0) 
      else 
            createMap()
    }

//Define initialMessage needed by Pregel as argument
    val initialMessage = createMap()


//Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps

    def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
      addRoute(attr, msg)
    }


// Define sendMsg as needed by Pregel as functional argument

    def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
        val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
      if (edge.srcAttr != addRoute(newAttr, edge.srcAttr)) 
           Iterator((edge.srcId, newAttr))
      else 
// Vertices can decide not to send message with empty iterator
           Iterator.empty
    }
//Invoke Pregel     
    Pregel(sssp, initialMessage)(vertexProgram, sendMessage, addRoute)
  }

//Need random generator to generate random miles for each edge
val r = scala.util.Random

//Create a random graph with 5 vertices for simplicity in the writing
//Attribute of edge of this graph is to store miles length of edge

val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5)
     .mapVertices( (id,attr) => (id.toDouble))
     .mapEdges(e=>(math.abs(r.nextInt(990)+10)))

// Now invoke run function defined which will return graph in the //vertex attribute, stores the Map of Vertex Id and miles to it


for (i<-run(graph).vertices.collect.map(x=>(x._1,x._2)).toSeq.sortBy(_._1))
//for (i<-run(graph,seqVertexId).vertices.take(5).map(x=>(x._1,x._2)))
{
    var distance=i._2
    for ((k,v)<-distance)
      {
         if (i._1!=k)
         {
         val print_sentence="From VertexId " + i._1.toString + " to " + k.toString + " shortest distance is " + v.toString + " Miles"
         println(print_sentence)
         }
      } 
}


/*
Output
From VertexId 0 to 1 shortest distance is 243 Miles
From VertexId 0 to 2 shortest distance is 422 Miles
From VertexId 0 to 3 shortest distance is 1102 Miles
From VertexId 0 to 4 shortest distance is 787 Miles
From VertexId 1 to 0 shortest distance is 874 Miles
From VertexId 1 to 2 shortest distance is 1296 Miles
From VertexId 1 to 3 shortest distance is 859 Miles
From VertexId 1 to 4 shortest distance is 1661 Miles
From VertexId 2 to 0 shortest distance is 1746 Miles
From VertexId 2 to 1 shortest distance is 872 Miles
From VertexId 2 to 3 shortest distance is 813 Miles
From VertexId 2 to 4 shortest distance is 374 Miles
From VertexId 3 to 0 shortest distance is 933 Miles
From VertexId 3 to 1 shortest distance is 59 Miles
From VertexId 3 to 2 shortest distance is 682 Miles
From VertexId 3 to 4 shortest distance is 1056 Miles
From VertexId 4 to 0 shortest distance is 1372 Miles
From VertexId 4 to 1 shortest distance is 498 Miles
From VertexId 4 to 2 shortest distance is 1121 Miles
From VertexId 4 to 3 shortest distance is 439 Miles

*/

Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.

As always, those codes written by me used in this writing is in my GitHub repo.

https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala