# Pregel Object

object Pregel extends Logging

Implements a Pregel-like bulk-synchronous message-passing API.

Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure. These changes allow for substantially more efficient distributed execution while also exposing greater flexibility for graph-based computation.

Example: Pregel abstraction to implement PageRank:

```
val pagerankGraph: Graph[Double, Double] = graph
  // Associate the degree with each vertex
  .outerJoinVertices(graph.outDegrees) {
    (vid, vdata, deg) => deg.getOrElse(0)
  }
  // Set the weight on the edges based on the degree
  .mapTriplets(e => 1.0 / e.srcAttr)
  // Set the vertex attributes to the initial pagerank values
  .mapVertices((id, attr) => 1.0)
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
  resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
  Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0
// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
  vertexProgram, sendMessage, messageCombiner)




```

Example:

```
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

  // A graph with edge attributes containing distances
  val graph: Graph[Long, Double] =
    GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)

  val sourceId: VertexId = 42 // The ultimate source
  // Initialize the graph such that all vertices except the root have canReach = false.
  val initialGraph: Graph[Boolean, Double]  = graph.mapVertices((id, _) => id == sourceId)
  val sssp = initialGraph.pregel(false)(
    (id, canReach, newCanReach) => canReach || newCanReach, // Vertex Program
    triplet => {  // Send Message
      if (triplet.srcAttr && !triplet.dstAttr) {
        Iterator((triplet.dstId, true))
      } else {
        Iterator.empty
      }
    },
    (a, b) => a || b // Merge Message
  )
  println(sssp.vertices.collect.mkString("\n"))
```

Reference:

<https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://george-jen.gitbook.io/data-science-and-apache-spark/pregel-object.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
