Pregel Object

object Pregel extends Logging

Implements a Pregel-like bulk-synchronous message-passing API.

Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure. These changes allow for substantially more efficient distributed execution while also exposing greater flexibility for graph-based computation.

Example: Pregel abstraction to implement PageRank:

val pagerankGraph: Graph[Double, Double] = graph
  // Associate the degree with each vertex
  .outerJoinVertices(graph.outDegrees) {
    (vid, vdata, deg) => deg.getOrElse(0)
  }
  // Set the weight on the edges based on the degree
  .mapTriplets(e => 1.0 / e.srcAttr)
  // Set the vertex attributes to the initial pagerank values
  .mapVertices((id, attr) => 1.0)
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
  resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
  Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0
// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
  vertexProgram, sendMessage, messageCombiner)



Example:

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

  // A graph with edge attributes containing distances
  val graph: Graph[Long, Double] =
    GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)

  val sourceId: VertexId = 42 // The ultimate source
  // Initialize the graph such that all vertices except the root have canReach = false.
  val initialGraph: Graph[Boolean, Double]  = graph.mapVertices((id, _) => id == sourceId)
  val sssp = initialGraph.pregel(false)(
    (id, canReach, newCanReach) => canReach || newCanReach, // Vertex Program
    triplet => {  // Send Message
      if (triplet.srcAttr && !triplet.dstAttr) {
        Iterator((triplet.dstId, true))
      } else {
        Iterator.empty
      }
    },
    (a, b) => a || b // Merge Message
  )
  println(sssp.vertices.collect.mkString("\n"))

Reference:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Last updated