bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases

What is Bulk Synchronous Parallel?

The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.

BSP computer

BSP computer consists of

components capable of processing and/or local memory transactions (i.e., processors)

a network that routes messages between pairs of such components

a hardware facility that allows for the synchronization of all or a subset of components.

Pregel from Google

There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.

In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:

Concurrent computing

Each calculation applying to vertices concurrently, up to the available CPU cores in each computer

Communication

Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step

Synchronization.

Vertices can decide when to cease sending messages

Iteration stops when no message to be sent.

Google’s Pregel implementation in Apache Spark Graphx

Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.

Method pregel is defined in abstract class Graph

pregel API is implemented in class GraphOps

Detail implementation is in Pregel object:

Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx

https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.

Based on pregel definition in class GraphOps

Arguments needed for invoking Graphx pregel API

As long as all arguments to the pregel API method are provided. What are arguments?

First you need to provide value arguments:

Second you need to define and provide 3 functional arguments.

Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.

Example: Populate vertices in graph with maximum attribute of a vertex

Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.

For example: Given following Graph that has 4 vertices, with attributes inside the circles

The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.

How it works:

In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).

In each subsequent super step:

Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)

Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message

Iteration exits once all vertices stop sending message.

Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.

Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.

Create a 10 vertices random graph:

Prepare arguments to invoke Google pregel API in Graphx

Here are required arguments, I use them as template every time I need prepare to invike Pregel API :

In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.

Invoke pregel API

Following code worked the same with anonymous functions.

Example: Trucking Company

Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.

As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.

Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.

Pregel Implementation:

Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:

VertexId, Seq(k,v)

Attribute of vertex is a sequence of key value (k,v) pair

k=destination vertexId

v=number of miles to the destination vertex

Define helper methods:

Create initial empty Map for key value pair

Add distance for the next edge to the destination vertex

Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail

Define initialMsg required by Pregel

Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps

Define sendMsg as needed by Pregel as functional argument

Following is the code:

Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.

As always, those codes written by me used in this writing is in my GitHub repo.

Last updated

Was this helpful?