The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.
BSP computer
BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors)
a network that routes messages between pairs of such components
a hardware facility that allows for the synchronization of all or a subset of components.
Pregel from Google
There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.
In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:
Concurrent computing
Each calculation applying to vertices concurrently, up to the available CPU cores in each computer
Communication
Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step
Synchronization.
Vertices can decide when to cease sending messages
Iteration stops when no message to be sent.
Google’s Pregel implementation in Apache Spark Graphx
Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.
Method pregel is defined in abstract class Graph
abstract class Graph[VD: ClassTag, ED: ClassTag] {
def pregel[A: ClassTag]
(initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)
(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A
)
: Graph[VD, ED]
}
pregel API is implemented in class GraphOps
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
}
Detail implementation is in Pregel object:
/**
* Implements a Pregel-like bulk-synchronous message-passing API.
*
* Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
* edges, enables the message sending computation to read both vertex attributes, and constrains
* messages to the graph structure. These changes allow for substantially more efficient
* distributed execution while also exposing greater flexibility for graph-based computation.
*
* @example We can use the Pregel abstraction to implement PageRank:
* {{{
* val pagerankGraph: Graph[Double, Double] = graph
* // Associate the degree with each vertex
* .outerJoinVertices(graph.outDegrees) {
* (vid, vdata, deg) => deg.getOrElse(0)
* }
* // Set the weight on the edges based on the degree
* .mapTriplets(e => 1.0 / e.srcAttr)
* // Set the vertex attributes to the initial pagerank values
* .mapVertices((id, attr) => 1.0)
*
* def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute Pregel for a fixed number of iterations.
* Pregel(pagerankGraph, initialMessage, numIter)(
* vertexProgram, sendMessage, messageCombiner)
* }}}
*
*/
object Pregel extends Logging {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the first
* iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run. The default is
* `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
* in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
* *both* vertices received a message.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
val checkpointInterval = graph.vertices.sparkContext.getConf
.getInt("spark.graphx.pregel.checkpointInterval", -1)
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
checkpointInterval, graph.vertices.sparkContext)
graphCheckpointer.update(g)
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
checkpointInterval, graph.vertices.sparkContext)
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
graphCheckpointer.update(g)
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist()
prevG.unpersistVertices()
prevG.edges.unpersist()
// count the iteration
i += 1
}
messageCheckpointer.unpersistDataSet()
graphCheckpointer.deleteAllCheckpoints()
messageCheckpointer.deleteAllCheckpoints()
g
} // end of apply
} // end of class Pregel
Super Step implementation is this while loop block, iteration will stop either no more messages to be sent or iteration counter reaches the maximum iteration limit.
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
graphCheckpointer.update(g)
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist()
prevG.unpersistVertices()
prevG.edges.unpersist()
// count the iteration
i += 1
}
Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx
While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.
Based on pregel definition in class GraphOps
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
]
}
Arguments needed for invoking Graphx pregel API
As long as all arguments to the pregel API method are provided. What are arguments?
First you need to provide value arguments:
initialMsg: A,
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg
Second you need to define and provide 3 functional arguments.
vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating vertex attribute.
Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.
Example: Populate vertices in graph with maximum attribute of a vertex
Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.
For example: Given following Graph that has 4 vertices, with attributes inside the circles
The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.
How it works:
In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).
In each subsequent super step:
Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)
Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message
Iteration exits once all vertices stop sending message.
Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.
Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.
Create a 10 vertices random graph:
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
graph.vertices.collect.foreach(println)
/*
You notice largest attribute of a vertex is 9.0
(4,4.0)
(0,0.0)
(6,6.0)
(8,8.0)
(2,2.0)
(1,1.0)
(3,3.0)
(7,7.0)
(9,9.0)
(5,5.0)
*/
Prepare arguments to invoke Google pregel API in Graphx
//define 3 functions as arguments of method pregel
val vProg = { (id: VertexId, attr: Double, msg: Double) => math.max(attr,msg) }
val sendMsg = { (triplet: EdgeTriplet[Double, Int]) =>
if (triplet.srcAttr > triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr))
} else {
Iterator.empty
}
}
val reduceMsg = { (a: Double, b: Double) => math.max(a,b) }
Here are required arguments, I use them as template every time I need prepare to invike Pregel API :
initialMsg: A,
initialMsg can be any types, that is why it has ClassTag A
maxIterations: Int, defaulted to Int.MaxValue
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
What is mandatory is initialMsg
vProg:
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
sendMsg:
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
mergeMsg:
A function that is for a vertex to combine messages received, and then process accordingly, such as updating it attribute.
In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.
Invoke pregel API
graph.pregel(Double.NegativeInfinity)(
vProg,
sendMsg,
reduceMsg
).vertices.collect.foreach(println)
/*
Output: Populate every Vertex attribute with largest attribute of a Vertex, which is 9.0 in the example
(4,9.0)
(0,9.0)
(6,9.0)
(8,9.0)
(2,9.0)
(1,9.0)
(3,9.0)
(7,9.0)
(9,9.0)
(5,9.0)
*/
Following code worked the same with anonymous functions.
Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.
As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.
Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.
Pregel Implementation:
Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:
VertexId, Seq(k,v)
Attribute of vertex is a sequence of key value (k,v) pair
k=destination vertexId
v=number of miles to the destination vertex
Define helper methods:
Create initial empty Map for key value pair
def createMap(x: (VertexId, Int)*) = Map(x: _*)
Add distance for the next edge to the destination vertex
def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }
Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail
def addRoute(busmap1: busMap, busmap2: busMap): busMap =
(busmap1.keySet ++ busmap2.keySet).map {
k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap
Define initialMsg required by Pregel
//Define initialMessage needed by Pregel as argument
val initialMessage = createMap()
Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps
Define sendMsg as needed by Pregel as functional argument
def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
if (edge.srcAttr != addRoute(newAttr, edge.srcAttr))
Iterator((edge.srcId, newAttr))
else
// Vertices can decide not to send message with empty iterator
Iterator.empty
Following is the code:
import org.apache.spark.graphx._
import scala.reflect.ClassTag
import org.apache.spark.graphx.impl
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util._
import org.apache.spark.sql._
//define Map as data type
type busMap = Map[VertexId, Int]
def createMap(x: (VertexId, Int)*) = Map(x: _*)
def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }
def addRoute(busmap1: busMap, busmap2: busMap): busMap =
(busmap1.keySet ++ busmap2.keySet).map {
k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
}.toMap
import scala.reflect.ClassTag
import org.apache.spark.graphx.impl
def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Graph[busMap, ED] = {
val sssp = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid))
createMap(vid -> 0)
else
createMap()
}
//Define initialMessage needed by Pregel as argument
val initialMessage = createMap()
//Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps
def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
addRoute(attr, msg)
}
// Define sendMsg as needed by Pregel as functional argument
def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
if (edge.srcAttr != addRoute(newAttr, edge.srcAttr))
Iterator((edge.srcId, newAttr))
else
// Vertices can decide not to send message with empty iterator
Iterator.empty
}
//Invoke Pregel
Pregel(sssp, initialMessage)(vertexProgram, sendMessage, addRoute)
}
//Need random generator to generate random miles for each edge
val r = scala.util.Random
//Create a random graph with 5 vertices for simplicity in the writing
//Attribute of edge of this graph is to store miles length of edge
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 5)
.mapVertices( (id,attr) => (id.toDouble))
.mapEdges(e=>(math.abs(r.nextInt(990)+10)))
// Now invoke run function defined which will return graph in the //vertex attribute, stores the Map of Vertex Id and miles to it
for (i<-run(graph).vertices.collect.map(x=>(x._1,x._2)).toSeq.sortBy(_._1))
//for (i<-run(graph,seqVertexId).vertices.take(5).map(x=>(x._1,x._2)))
{
var distance=i._2
for ((k,v)<-distance)
{
if (i._1!=k)
{
val print_sentence="From VertexId " + i._1.toString + " to " + k.toString + " shortest distance is " + v.toString + " Miles"
println(print_sentence)
}
}
}
/*
Output
From VertexId 0 to 1 shortest distance is 243 Miles
From VertexId 0 to 2 shortest distance is 422 Miles
From VertexId 0 to 3 shortest distance is 1102 Miles
From VertexId 0 to 4 shortest distance is 787 Miles
From VertexId 1 to 0 shortest distance is 874 Miles
From VertexId 1 to 2 shortest distance is 1296 Miles
From VertexId 1 to 3 shortest distance is 859 Miles
From VertexId 1 to 4 shortest distance is 1661 Miles
From VertexId 2 to 0 shortest distance is 1746 Miles
From VertexId 2 to 1 shortest distance is 872 Miles
From VertexId 2 to 3 shortest distance is 813 Miles
From VertexId 2 to 4 shortest distance is 374 Miles
From VertexId 3 to 0 shortest distance is 933 Miles
From VertexId 3 to 1 shortest distance is 59 Miles
From VertexId 3 to 2 shortest distance is 682 Miles
From VertexId 3 to 4 shortest distance is 1056 Miles
From VertexId 4 to 0 shortest distance is 1372 Miles
From VertexId 4 to 1 shortest distance is 498 Miles
From VertexId 4 to 2 shortest distance is 1121 Miles
From VertexId 4 to 3 shortest distance is 439 Miles
*/
Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.
As always, those codes written by me used in this writing is in my GitHub repo.