bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases
What is Bulk Synchronous Parallel?
The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.
BSP computer
BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors)
a network that routes messages between pairs of such components
a hardware facility that allows for the synchronization of all or a subset of components.

Pregel from Google
There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.
In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:
Concurrent computing
Each calculation applying to vertices concurrently, up to the available CPU cores in each computer
Communication
Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step
Synchronization.
Vertices can decide when to cease sending messages
Iteration stops when no message to be sent.
Google’s Pregel implementation in Apache Spark Graphx
Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.
Method pregel is defined in abstract class Graph
pregel API is implemented in class GraphOps
Detail implementation is in Pregel object:
Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx
While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.
Based on pregel definition in class GraphOps
Arguments needed for invoking Graphx pregel API
As long as all arguments to the pregel API method are provided. What are arguments?
First you need to provide value arguments:
Second you need to define and provide 3 functional arguments.
Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.
Example: Populate vertices in graph with maximum attribute of a vertex
Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.
For example: Given following Graph that has 4 vertices, with attributes inside the circles

The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.
How it works:
In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).
In each subsequent super step:
Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)
Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message
Iteration exits once all vertices stop sending message.

Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.
Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.
Create a 10 vertices random graph:
Prepare arguments to invoke Google pregel API in Graphx
Here are required arguments, I use them as template every time I need prepare to invike Pregel API :
In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.
Invoke pregel API
Following code worked the same with anonymous functions.
Example: Trucking Company
Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.
As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.
Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.
Pregel Implementation:
Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:
VertexId, Seq(k,v)
Attribute of vertex is a sequence of key value (k,v) pair
k=destination vertexId
v=number of miles to the destination vertex
Define helper methods:
Create initial empty Map for key value pair
Add distance for the next edge to the destination vertex
Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail
Define initialMsg required by Pregel
Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps
Define sendMsg as needed by Pregel as functional argument
Following is the code:
Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.
As always, those codes written by me used in this writing is in my GitHub repo.
Last updated
Was this helpful?