Pregel Object
object Pregel extends Logging
Implements a Pregel-like bulk-synchronous message-passing API.
Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure. These changes allow for substantially more efficient distributed execution while also exposing greater flexibility for graph-based computation.
Example: Pregel abstraction to implement PageRank:
1
val pagerankGraph: Graph[Double, Double] = graph
2
// Associate the degree with each vertex
3
.outerJoinVertices(graph.outDegrees) {
4
(vid, vdata, deg) => deg.getOrElse(0)
5
}
6
// Set the weight on the edges based on the degree
7
.mapTriplets(e => 1.0 / e.srcAttr)
8
// Set the vertex attributes to the initial pagerank values
9
.mapVertices((id, attr) => 1.0)
10
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
11
resetProb + (1.0 - resetProb) * msgSum
12
def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
13
Iterator((edge.dstId, edge.srcAttr * edge.attr))
14
def messageCombiner(a: Double, b: Double): Double = a + b
15
val initialMessage = 0.0
16
// Execute Pregel for a fixed number of iterations.
17
Pregel(pagerankGraph, initialMessage, numIter)(
18
vertexProgram, sendMessage, messageCombiner)
19
​
20
​
21
​
22
​
Copied!
Example:
1
import org.apache.spark.graphx.{Graph, VertexId}
2
import org.apache.spark.graphx.util.GraphGenerators
3
​
4
// A graph with edge attributes containing distances
5
val graph: Graph[Long, Double] =
6
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
7
​
8
val sourceId: VertexId = 42 // The ultimate source
9
// Initialize the graph such that all vertices except the root have canReach = false.
10
val initialGraph: Graph[Boolean, Double] = graph.mapVertices((id, _) => id == sourceId)
11
val sssp = initialGraph.pregel(false)(
12
(id, canReach, newCanReach) => canReach || newCanReach, // Vertex Program
13
triplet => { // Send Message
14
if (triplet.srcAttr && !triplet.dstAttr) {
15
Iterator((triplet.dstId, true))
16
} else {
17
Iterator.empty
18
}
19
},
20
(a, b) => a || b // Merge Message
21
)
22
println(sssp.vertices.collect.mkString("\n"))
Copied!
Reference:
Last modified 1yr ago
Copy link