bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases

What is Bulk Synchronous Parallel?

The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.

BSP computer

BSP computer consists of

components capable of processing and/or local memory transactions (i.e., processors)

a network that routes messages between pairs of such components

a hardware facility that allows for the synchronization of all or a subset of components.

Pregel from Google

There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.

In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:

Concurrent computing

Each calculation applying to vertices concurrently, up to the available CPU cores in each computer

Communication

Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step

Synchronization.

Vertices can decide when to cease sending messages

Iteration stops when no message to be sent.

Google’s Pregel implementation in Apache Spark Graphx

Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.

Method pregel is defined in abstract class Graph

abstract class Graph[VD: ClassTag, ED: ClassTag] {

def pregel[A: ClassTag]
(initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)
(
    vprog: (VertexId, VD, A) => VD,
    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
    mergeMsg: (A, A) => A
)    
: Graph[VD, ED]
}

pregel API is implemented in class GraphOps

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
  /**
   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
   * user-defined vertex-program `vprog` is executed in parallel on
   * each vertex receiving any inbound messages and computing a new
   * value for the vertex.  The `sendMsg` function is then invoked on
   * all out-edges and is used to compute an optional message to the
   * destination vertex. The `mergeMsg` function is a commutative
   * associative function used to combine messages destined to the
   * same vertex.
   *
   * On the first iteration all vertices receive the `initialMsg` and
   * on subsequent iterations if a vertex does not receive a message
   * then the vertex-program is not invoked.
   *
   * This function iterates until there are no remaining messages, or
   * for `maxIterations` iterations.
   *
   * @tparam A the Pregel message type
   *
   * @param initialMsg the message each vertex will receive at the on
   * the first iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
}

Detail implementation is in Pregel object:

/**
 * Implements a Pregel-like bulk-synchronous message-passing API.
 *
 * Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
 * edges, enables the message sending computation to read both vertex attributes, and constrains
 * messages to the graph structure.  These changes allow for substantially more efficient
 * distributed execution while also exposing greater flexibility for graph-based computation.
 *
 * @example We can use the Pregel abstraction to implement PageRank:
 * {{{
 * val pagerankGraph: Graph[Double, Double] = graph
 *   // Associate the degree with each vertex
 *   .outerJoinVertices(graph.outDegrees) {
 *     (vid, vdata, deg) => deg.getOrElse(0)
 *   }
 *   // Set the weight on the edges based on the degree
 *   .mapTriplets(e => 1.0 / e.srcAttr)
 *   // Set the vertex attributes to the initial pagerank values
 *   .mapVertices((id, attr) => 1.0)
 *
 * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
 *   resetProb + (1.0 - resetProb) * msgSum
 * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
 *   Iterator((edge.dstId, edge.srcAttr * edge.attr))
 * def messageCombiner(a: Double, b: Double): Double = a + b
 * val initialMessage = 0.0
 * // Execute Pregel for a fixed number of iterations.
 * Pregel(pagerankGraph, initialMessage, numIter)(
 *   vertexProgram, sendMessage, messageCombiner)
 * }}}
 *
 */
object Pregel extends Logging {

  /**
   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
   * user-defined vertex-program `vprog` is executed in parallel on
   * each vertex receiving any inbound messages and computing a new
   * value for the vertex.  The `sendMsg` function is then invoked on
   * all out-edges and is used to compute an optional message to the
   * destination vertex. The `mergeMsg` function is a commutative
   * associative function used to combine messages destined to the
   * same vertex.
   *
   * On the first iteration all vertices receive the `initialMsg` and
   * on subsequent iterations if a vertex does not receive a message
   * then the vertex-program is not invoked.
   *
   * This function iterates until there are no remaining messages, or
   * for `maxIterations` iterations.
   *
   * @tparam VD the vertex data type
   * @tparam ED the edge data type
   * @tparam A the Pregel message type
   *
   * @param graph the input graph.
   *
   * @param initialMsg the message each vertex will receive at the first
   * iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run. The default is
   * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
   * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
   * *both* vertices received a message.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
     (graph: Graph[VD, ED],
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] =
  {
    require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
      s" but got ${maxIterations}")

    val checkpointInterval = graph.vertices.sparkContext.getConf
      .getInt("spark.graphx.pregel.checkpointInterval", -1)
    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
    val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
      checkpointInterval, graph.vertices.sparkContext)
    graphCheckpointer.update(g)

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
      checkpointInterval, graph.vertices.sparkContext)
    messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
    var activeMessages = messages.count()

    // Loop
    var prevG: Graph[VD, ED] = null
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      prevG = g
      g = g.joinVertices(messages)(vprog)
      graphCheckpointer.update(g)

      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
      // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
      // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
      // and the vertices of g).
      messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
      activeMessages = messages.count()

      logInfo("Pregel finished iteration " + i)

      // Unpersist the RDDs hidden by newly-materialized RDDs
      oldMessages.unpersist()
      prevG.unpersistVertices()
      prevG.edges.unpersist()
      // count the iteration
      i += 1
    }
    messageCheckpointer.unpersistDataSet()
    graphCheckpointer.deleteAllCheckpoints()
    messageCheckpointer.deleteAllCheckpoints()
    g
  } // end of apply

} // end of class Pregel


Super Step implementation is this while loop block, iteration will stop either no more messages to be sent or iteration counter reaches the maximum iteration limit.
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      prevG = g
      g = g.joinVertices(messages)(vprog)
      graphCheckpointer.update(g)

      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
      // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
      // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
      // and the vertices of g).
      messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
      activeMessages = messages.count()

      logInfo("Pregel finished iteration " + i)

      // Unpersist the RDDs hidden by newly-materialized RDDs
      oldMessages.unpersist()
      prevG.unpersistVertices()
      prevG.edges.unpersist()
      // count the iteration
      i += 1
    }

Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx

https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.

Based on pregel definition in class GraphOps

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
]
}

Arguments needed for invoking Graphx pregel API

As long as all arguments to the pregel API method are provided. What are arguments?

First you need to provide value arguments:

initialMsg: A, 
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue 
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg

Second you need to define and provide 3 functional arguments.

vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step

sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)

mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating vertex attribute.

Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.

Example: Populate vertices in graph with maximum attribute of a vertex

Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.

For example: Given following Graph that has 4 vertices, with attributes inside the circles

The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.

How it works:

In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).

In each subsequent super step:

Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)

Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message

Iteration exits once all vertices stop sending message.

Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.

Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.

Create a 10 vertices random graph:

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
graph.vertices.collect.foreach(println)
/*
You notice largest attribute of a vertex is 9.0
(4,4.0)
(0,0.0)
(6,6.0)
(8,8.0)
(2,2.0)
(1,1.0)
(3,3.0)
(7,7.0)
(9,9.0)
(5,5.0)

*/

Prepare arguments to invoke Google pregel API in Graphx

//define 3 functions as arguments of method pregel
val vProg = { (id: VertexId, attr: Double, msg: Double) => math.max(attr,msg) }

val sendMsg = { (triplet: EdgeTriplet[Double, Int]) =>
       if (triplet.srcAttr > triplet.dstAttr) {
              Iterator((triplet.dstId, triplet.srcAttr))
        } else {
              Iterator.empty
    }
}

val reduceMsg = { (a: Double, b: Double) => math.max(a,b) }

Here are required arguments, I use them as template every time I need prepare to invike Pregel API :

initialMsg: A, 
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue 
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg
vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating it attribute.

In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.

Invoke pregel API

graph.pregel(Double.NegativeInfinity)(
vProg,
sendMsg,
reduceMsg
).vertices.collect.foreach(println)

/*
Output: Populate every Vertex attribute with largest attribute of a Vertex, which is 9.0 in the example

(4,9.0)
(0,9.0)
(6,9.0)
(8,9.0)
(2,9.0)
(1,9.0)
(3,9.0)
(7,9.0)
(9,9.0)
(5,9.0)


*/

Following code worked the same with anonymous functions.

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
graph.vertices.collect.foreach(println)
/*
(4,4.0)
(0,0.0)
(6,6.0)
(8,8.0)
(2,2.0)
(1,1.0)
(3,3.0)
(7,7.0)
(9,9.0)
(5,5.0)

*/

val maxGraph=graph.pregel(Double.NegativeInfinity)(
(id, dist, newDist) => math.max(dist, newDist),
triplet => {  // Send Message
    if (triplet.srcAttr > triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.max(a, b) // Merge Message

)
maxGraph.vertices.collect.foreach(println)
/*
(4,9.0)
(0,9.0)
(6,9.0)
(8,9.0)
(2,9.0)
(1,9.0)
(3,9.0)
(7,9.0)
(9,9.0)
(5,9.0)

*/

Example: Trucking Company

Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.

As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.

Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.

Pregel Implementation:

Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:

VertexId, Seq(k,v)

Attribute of vertex is a sequence of key value (k,v) pair

k=destination vertexId

v=number of miles to the destination vertex

Define helper methods:

Create initial empty Map for key value pair

def createMap(x: (VertexId, Int)*) = Map(x: _*)

Add distance for the next edge to the destination vertex

def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }

Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail

def addRoute(busmap1: busMap, busmap2: busMap): busMap =
    (busmap1.keySet ++ busmap2.keySet).map {
      k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap

Define initialMsg required by Pregel

//Define initialMessage needed by Pregel as argument
    val initialMessage = createMap()

Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps

    def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
      addRoute(attr, msg)
    }

Define sendMsg as needed by Pregel as functional argument

def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
        val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
      if (edge.srcAttr != addRoute(newAttr, edge.srcAttr)) 
           Iterator((edge.srcId, newAttr))
      else 
// Vertices can decide not to send message with empty iterator
           Iterator.empty

Following is the code:

import org.apache.spark.graphx._
import scala.reflect.ClassTag
import org.apache.spark.graphx.impl

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._
import org.apache.spark.sql._


//define Map as data type
type busMap = Map[VertexId, Int]

def createMap(x: (VertexId, Int)*) = Map(x: _*)

def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }

def addRoute(busmap1: busMap, busmap2: busMap): busMap =
    (busmap1.keySet ++ busmap2.keySet).map {
      k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap

import scala.reflect.ClassTag
import org.apache.spark.graphx.impl

def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Graph[busMap, ED] = {
    val sssp = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) 
            createMap(vid -> 0) 
      else 
            createMap()
    }

//Define initialMessage needed by Pregel as argument
    val initialMessage = createMap()


//Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps

    def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
      addRoute(attr, msg)
    }


// Define sendMsg as needed by Pregel as functional argument

    def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
        val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
      if (edge.srcAttr != addRoute(newAttr, edge.srcAttr)) 
           Iterator((edge.srcId, newAttr))
      else 
// Vertices can decide not to send message with empty iterator
           Iterator.empty
    }
//Invoke Pregel     
    Pregel(sssp, initialMessage)(vertexProgram, sendMessage, addRoute)
  }

//Need random generator to generate random miles for each edge
val r = scala.util.Random

//Create a random graph with 5 vertices for simplicity in the writing
//Attribute of edge of this graph is to store miles length of edge

val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5)
     .mapVertices( (id,attr) => (id.toDouble))
     .mapEdges(e=>(math.abs(r.nextInt(990)+10)))

// Now invoke run function defined which will return graph in the //vertex attribute, stores the Map of Vertex Id and miles to it


for (i<-run(graph).vertices.collect.map(x=>(x._1,x._2)).toSeq.sortBy(_._1))
//for (i<-run(graph,seqVertexId).vertices.take(5).map(x=>(x._1,x._2)))
{
    var distance=i._2
    for ((k,v)<-distance)
      {
         if (i._1!=k)
         {
         val print_sentence="From VertexId " + i._1.toString + " to " + k.toString + " shortest distance is " + v.toString + " Miles"
         println(print_sentence)
         }
      } 
}


/*
Output
From VertexId 0 to 1 shortest distance is 243 Miles
From VertexId 0 to 2 shortest distance is 422 Miles
From VertexId 0 to 3 shortest distance is 1102 Miles
From VertexId 0 to 4 shortest distance is 787 Miles
From VertexId 1 to 0 shortest distance is 874 Miles
From VertexId 1 to 2 shortest distance is 1296 Miles
From VertexId 1 to 3 shortest distance is 859 Miles
From VertexId 1 to 4 shortest distance is 1661 Miles
From VertexId 2 to 0 shortest distance is 1746 Miles
From VertexId 2 to 1 shortest distance is 872 Miles
From VertexId 2 to 3 shortest distance is 813 Miles
From VertexId 2 to 4 shortest distance is 374 Miles
From VertexId 3 to 0 shortest distance is 933 Miles
From VertexId 3 to 1 shortest distance is 59 Miles
From VertexId 3 to 2 shortest distance is 682 Miles
From VertexId 3 to 4 shortest distance is 1056 Miles
From VertexId 4 to 0 shortest distance is 1372 Miles
From VertexId 4 to 1 shortest distance is 498 Miles
From VertexId 4 to 2 shortest distance is 1121 Miles
From VertexId 4 to 3 shortest distance is 439 Miles

*/

Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.

As always, those codes written by me used in this writing is in my GitHub repo.

Last updated