Spark Graph Computing Continue

At present, there are many graph-based parallel computing frameworks, such as

Pregel from Google

Giraph / HAMA from Apache open source graph computing framework and the GraphLab.

Among them, Pregel, HAMA and Giraph are very similar and are based on BSP ( Bulk Synchronous Parallell) mode.

A BSP computer consists of

components capable of processing and/or local memory transactions (i.e., processors), a network that routes messages between pairs of such components, and a hardware facility that allows for the synchronization of all or a subset of components.

This is commonly interpreted as a set of processors which may follow different threads of computation, with each processor equipped with fast local memory and interconnected by a communication network. A BSP algorithm relies heavily on the third feature; a computation proceeds in a series of global super steps, which consists of three components:

Concurrent computation: every participating processor may perform local computations, i.e., each process can only make use of values stored in the local fast memory of the processor. The computations occur asynchronously of all the others but may overlap with communication.

Communication: The processes exchange data between themselves to facilitate remote data storage capabilities.

Barrier synchronization: When a process reaches this point (the barrier), it waits until all other processes have reached the same barrier.

Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:

For example, we can calculate the in-degree of each vertex (defined in GraphOps) in the following way.

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

Following is the Graph operations defined in abstract class Graph, that has all the class attributes and class methods.

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.VertexRDDImpl
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}

abstract class Graph[VD: ClassTag, ED: ClassTag] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = 
    StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): 
    Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: 
     PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): 
     Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): 
     Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, 
    Iterator[Edge[ED]]) => Iterator[ED2]): 
      Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => 
    ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: 
    (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => 
      Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => 
        Boolean = (x => true),
      vpred: (VertexId, VD) => 
        Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph1[VD2, ED2]): 
    Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): 
    Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U]
  (table: RDD[(VertexId, U)])
  (mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2]
    (other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about 
  // adjacent triplets =================================================
  def collectNeighborIds
    (edgeDirection: EdgeDirection): 
      VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: 
    EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag, A: ClassTag]
    (
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
  tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A: ClassTag]
    (initialMsg: A, 
      maxIterations: Int, 
        activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => 
      Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank
  (tol: Double, resetProb: Double = 0.15): 
    Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents
    (numIter: Int): Graph[VertexId, ED]
}

The above code is to define Graph abstract class, where its methods are not yet defined. The definitions of methods will be illustrated later on here.

There are following operations you can perform on a graph:

Operations on property (property operator)

abstract class Graph[VD: ClassTag, ED: ClassTag] {
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): 
    Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): 
    Graph[VD, ED2]
  def mapEdges[ED2]
    (map: (PartitionID, Iterator[Edge[ED]]) => 
      Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2]
    (map: EdgeTriplet[VD, ED] => ED2): 
      Graph[VD, ED2]
  def mapTriplets[ED2]
  (map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
}

method mapVertices[VD2]

  /**
   * Transforms each vertex attribute in the graph 
   using the map function.
   *
   * @note The new graph has the same structure.  
   As a consequence the underlying index structures
   * can be reused.
   *
   * @param map the function from a vertex object 
   to a new vertex value
   *
   * @tparam VD2 the new vertex data type
   *
   * @example We might use this operation to 
   change the vertex values
   * from one type to another to initialize an 
   algorithm.
   * {{{
   * val rawGraph: Graph[(), ()] = 
     Graph.textFile("hdfs://file")
   * val root = 42
   * var bfsGraph = rawGraph.mapVertices[Int]
     ((vid, data) => if (vid == root) 0 
       else Math.MaxValue)
   * }}}
   *
   */
def mapVertices[VD2: ClassTag]
  (map: (VertexId, VD) => VD2): Graph[VD2, ED]
  
//Example:
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 12)
  .mapVertices( (id, _) => id.toDouble )

method mapEdges[ED2]

  /**
   * Transforms each edge attribute in the graph 
     using the map function.  The map function is 
     not
   * passed the vertex value for the vertices 
     adjacent to the edge.  If vertex values are 
     desired,
   * use `mapTriplets`.
   *
   * @note This graph is not changed and that the 
    new graph has the
   * same structure.  As a consequence the 
     underlying index structures
   * can be reused.
   *
   * @param map the function from an edge object 
     to a new edge value.
   *
   * @tparam ED2 the new edge data type
   *
   * @example This function might be used to 
     initialize edge
   * attributes.
   *
   */
  def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2):
   Graph[VD, ED2] = {
    mapEdges((pid, iter) => iter.map(map))
  }
  
  //Example
  
val edgeTo2 = graph.mapEdges(e => 2)
edgeTo2.edges.take(5).foreach(println)

/*
Edge(0,1,2)
Edge(0,1,2)
Edge(0,3,2)
Edge(0,3,2)
Edge(0,5,2)
*/
  
    /**
   * Transforms each edge attribute using the map 
   function, passing it a whole partition at a
   * time. The map function is given an iterator 
   over edges within a logical partition as well as
   * the partition's ID, and it should return a 
   new iterator over the new values of each edge. 
   The
   * new iterator's elements must correspond 
   one-to-one with the old iterator's elements. If
   * adjacent vertex values are desired, 
   use `mapTriplets`.
   *
   * @note This does not change the structure 
   of the
   * graph or modify the values of this graph.  
   As a consequence
   * the underlying index structures can be reused.
   *
   * @param map a function that takes a 
   partition id and an iterator
   * over all the edges in the partition, and 
   must return an iterator over
   * the new values for each edge in the 
   order of the input iterator
   *
   * @tparam ED2 the new edge data type
   *
   */
  def mapEdges[ED2: ClassTag]
    (map: (PartitionID, Iterator[Edge[ED]]) => 
      Iterator[ED2])
    : Graph[VD, ED2]

method mapTriplets[ED2]

 /**
   * Transforms each edge attribute using the 
   map function, passing it the adjacent vertex
   * attributes as well. If adjacent vertex 
   values are not required,
   * consider using `mapEdges` instead.
   *
   * @note This does not change the structure 
   of the
   * graph or modify the values of this graph.  
   As a consequence
   * the underlying index structures can be reused.
   *
   * @param map the function from an edge object 
   to a new edge value.
   *
   * @tparam ED2 the new edge data type
   *
   * @example This function might be used to 
   initialize edge
   * attributes based on the attributes associated 
   with each vertex.
   * {{{
   * val rawGraph: Graph[Int, Int] = 
     someLoadFunction()
   * val graph = rawGraph.mapTriplets[Int]
     ( edge =>
   *   edge.src.data - edge.dst.data)
   * }}}
   *
   */
  def mapTriplets[ED2: ClassTag]
    (map: EdgeTriplet[VD, ED] => ED2): 
      Graph[VD, ED2] = {
       mapTriplets((pid, iter) => iter.map(map), 
         TripletFields.All)
  }

//Example:
graph.mapTriplets(e=>e.attr*10)

 /**
   * Transforms each edge attribute using the map 
   function, passing it the adjacent vertex
   * attributes as well. If adjacent vertex values 
   are not required,
   * consider using `mapEdges` instead.
   *
   * @note This does not change the structure of 
   the
   * graph or modify the values of this graph. 
   As a consequence
   * the underlying index structures can be reused.
   *
   * @param map the function from an edge object 
   to a new edge value.
   * @param tripletFields which fields should be 
   included in the edge triplet passed to the map
   *   function. If not all fields are needed, 
   specifying this can improve performance.
   *
   * @tparam ED2 the new edge data type
   *
   * @example This function might be used to 
   initialize edge
   * attributes based on the attributes associated 
   with each vertex.
   * {{{
   * val rawGraph: Graph[Int, Int] = 
   someLoadFunction()
   * val graph = rawGraph.mapTriplets[Int]
   ( edge =>
   *   edge.src.data - edge.dst.data)
   * }}}
   *
   */
  def mapTriplets[ED2: ClassTag](
      map: EdgeTriplet[VD, ED] => ED2,
      tripletFields: TripletFields): 
        Graph[VD, ED2] = {
    mapTriplets((pid, iter) => 
      iter.map(map), tripletFields)
  }

  /**
   * Transforms each edge attribute a partition 
   at a time using the map function, passing it the
   * adjacent vertex attributes as well. The map 
   function is given an iterator over edge triplets
   * within a logical partition and should yield 
   a new iterator over the new values of each edge in
   * the order in which they are provided.  
   If adjacent vertex values are not required, 
   consider
   * using `mapEdges` instead.
   *
   * @note This does not change the structure of 
   the
   * graph or modify the values of this graph.  
   As a consequence
   * the underlying index structures can be reused.
   *
   * @param map the iterator transform
   * @param tripletFields which fields should be 
   included in the edge triplet passed to the map
   *   function. If not all fields are needed, 
   specifying this can improve performance.
   *
   * @tparam ED2 the new edge data type
   *
   */
  def mapTriplets[ED2: ClassTag](
      map: 
  (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => 
    Iterator[ED2],
      tripletFields: TripletFields):
       Graph[VD, ED2]

Operations on structure (structural operator)

// Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]

  def subgraph(
      epred: 
    EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: 
    (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph1[VD2, ED2]): 
    Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): 
    Graph[VD, ED]

method reverse

/**
   * Reverses all edges in the graph.  If this 
   graph contains an edge from a to b then the 
   returned
   * graph contains an edge from b to a.
   */
  def reverse: Graph[VD, ED]
  
 // example:
 graph.edges.collect.foreach(println)
 /*
Edge(0,1,1)
Edge(0,1,1)
Edge(0,4,1)
Edge(1,1,1)
Edge(1,3,1)
Edge(2,2,1)
Edge(2,3,1)
Edge(2,3,1)
Edge(3,0,1)
Edge(3,0,1)
Edge(3,2,1)
Edge(4,0,1)
Edge(4,0,1)
Edge(4,1,1)
Edge(4,2,1)
 */

graph.reverse.edges.collect.foreach(println)

/*
Edge(1,0,1)
Edge(1,0,1)
Edge(1,1,1)
Edge(3,1,1)
Edge(4,0,1)
Edge(0,3,1)
Edge(0,3,1)
Edge(0,4,1)
Edge(0,4,1)
Edge(1,4,1)
Edge(2,2,1)
Edge(2,3,1)
Edge(2,4,1)
Edge(3,2,1)
Edge(3,2,1)
​
*/
    
      
          
   
    
     
       

method subgraph

  /**
   * Restricts the graph to only the vertices and 
   edges satisfying the predicates. The resulting
   * subgraph satisfies
   *
   * {{{
   * V' = {v : for all v in V where vpred(v)}
   * E' = {(u,v): for all (u,v) in E 
     where epred((u,v)) && vpred(u) && vpred(v)}
   * }}}
   *
   * @param epred the edge predicate, which takes 
   a triplet and
   * evaluates to true if the edge is to remain 
   in the subgraph.  Note
   * that only edges where both vertices satisfy 
   the vertex
   * predicate are considered.
   *
   * @param vpred the vertex predicate, which 
   takes a vertex object and
   * evaluates to true if the vertex is to be 
   included in the subgraph
   *
   * @return the subgraph containing only the 
   vertices and edges that
   * satisfy the predicates
   */
  def subgraph(
      epred: 
    EdgeTriplet[VD, ED] => Boolean = (x => true),
      vpred: 
    (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]

//Example:
graph.subgraph(e=>e.srcId==3,(vid,_)=>vid==0)
  .vertices.foreach(println)
  
/*
(0,3)
*/

method mask[VD2,ED2]

  /**
   * Restricts the graph to only the vertices and 
   edges that are also in `other`, but keeps the
   * attributes from this graph.
   * @param other the graph to project this graph 
   onto
   * @return a graph with vertices and edges that 
   exist in both the current graph and `other`,
   * with vertex and edge data from the current 
   graph
   */
  def mask[VD2: ClassTag, ED2: ClassTag]
  (other: Graph[VD2, ED2]): Graph[VD, ED]

//Example

graph.mask(graph).vertices.collect.foreach(println)

/*
(4,4)
(0,3)
(2,3)
(1,2)
(3,3)
*/

method groupEdges

  /**
   * Merges multiple edges between two vertices 
   into a single edge. For correct results, the 
   graph
   * must have been partitioned using 
   `partitionBy`.
   *
   * @param merge the user-supplied commutative 
   associative function to merge edge attributes
   *              for duplicate edges.
   *
   * @return The resulting graph with a single 
   edge for each (source, dest) vertex pair.
   */
  def groupEdges(merge: (ED, ED) => ED): 
  Graph[VD, ED]
  
  //Example, like reduce op
  graph.groupEdges(_+_).edges.collect.foreach(println)
  
 

Join operation (Join operator)

// Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])
    (mapFunc: (VertexId, VD, U) => VD): 
      Graph[VD, ED]
  def outerJoinVertices[U, VD2]
    (other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]u

method joinVertices

  /**
   * Join the vertices with an RDD and then apply 
   a function from the
   * vertex and RDD entry to a new vertex value.  
   The input table
   * should contain at most one entry for each 
   vertex.  If no entry is
   * provided the map function is skipped and the 
   old value is used.
   *
   * @tparam U the type of entry in the table 
   of updates
   * @param table the table to join with the 
   vertices in the graph.
   * The table should contain at most one entry 
   for each vertex.
   * @param mapFunc the function used to compute 
   the new vertex
   * values.  The map function is invoked only 
   for vertices with a
   * corresponding entry in the table otherwise 
   the old vertex value
   * is used.
   *
   * @example This function is used to update the 
   vertices with new
   * values based on external data.  For example 
   we could add the out
   * degree to each vertex record
   *
   * {{{
   * val rawGraph: Graph[Int, Int] = 
   GraphLoader.edgeListFile(sc, "webgraph")
   *   .mapVertices((_, _) => 0)
   * val outDeg = rawGraph.outDegrees
   * val graph = rawGraph.joinVertices[Int](outDeg)
   *   ((_, _, outDeg) => outDeg)
   * }}}
   *
   */
  def joinVertices[U: ClassTag]
 (table: RDD[(VertexId, U)])
  (mapFunc: (VertexId, VD, U) => VD)
    : Graph[VD, ED] = {
    val uf = (id: VertexId, data: VD, o: Option[U]) 
      => {
      o match {
        case Some(u) => mapFunc(id, data, u)
        case None => data
      }
    }
    graph.outerJoinVertices(table)(uf)
  }

//example

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("graphx")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()
val sc=spark.sparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
val defaultUser = ("", "Missing")
val graph = Graph(users, relationships, defaultUser)

val userWithAge: RDD[(VertexId, Int)] =
      sc.parallelize(Array(
        (1L, 25), (2L, 30), (3L, 23), (4L, 30)
      ))
      
graph.joinVertices(userWithAge) { (id, prop, age) => {
  (prop._1 + "", prop._2 + " Age is " + age.toString)
}}.vertices.collect.foreach(println)

/*
(4,(sherry,owner wife Age is 30))
(2,(george,clerk Age is 30))
(1,(jack,owner Age is 25))
(3,(mary,sales Age is 23))
​
*/



method outerJoinVertices[U: ClassTag, VD2: ClassTag]

  /**
   * Joins the vertices with entries in the `table` 
   RDD and merges the results using `mapFunc`.
   * The input table should contain at most one 
   entry for each vertex.  If no entry in `other` 
   is
   * provided for a particular vertex in the 
   graph, the map function receives `None`.
   *
   * @tparam U the type of entry in the table of 
   updates
   * @tparam VD2 the new vertex value type
   *
   * @param other the table to join with the 
   vertices in the graph.
   *              The table should contain at 
   most one entry for each vertex.
   * @param mapFunc the function used to compute 
   the new vertex values.
   *                The map function is invoked 
   for all vertices, even those
   *                that do not have a 
   corresponding entry in the table.
   *
   * @example This function is used to update 
   the vertices with new values based on external 
   data.
   *          For example we could add the 
   out-degree to each vertex record:
   *
   * {{{
   * val rawGraph: Graph[_, _] = 
   Graph.textFile("webgraph")
   * val outDeg: RDD[(VertexId, Int)] = 
   rawGraph.outDegrees
   * val graph = 
   rawGraph.outerJoinVertices(outDeg) {
   *   (vid, data, optDeg) => optDeg.getOrElse(0)
   * }
   * }}}
   */
  def outerJoinVertices[U: ClassTag, VD2: ClassTag]
  (other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
        (implicit eq: VD =:= VD2 = null)
    : Graph[VD2, ED]

//Example:
graph.outerJoinVertices(userWithAge) { (id, prop, age) =>
      age match {
        case Some(age) => (prop._1 + "", prop._2 + " Age is " + age.toString)
        case None => "0" 
      }
    }.vertices.collect.foreach(println)
    
/*
(4,(sherry,owner wife Age is 30))
(2,(george,clerk Age is 30))
(1,(jack,owner Age is 25))
(3,(mary,sales Age is 23))
 */

Neighborhood Aggregation

  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds
    (edgeDirection: EdgeDirection): 
       VertexRDD[Array[VertexId]]
  def collectNeighbors
      (edgeDirection: EdgeDirection): 
          VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag, A: ClassTag]
   (
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = 
        TripletFields.All)
    : VertexRDD[A]

method collectNeighborIds(edgeDirection: EdgeDirection)

  /**
   * Collect the neighbor vertex ids for each 
     vertex.
   *
   * @param edgeDirection the direction along 
      which to collect
   * neighboring vertices
   *
   * @return the set of neighboring ids for 
      each vertex
   */
  def collectNeighborIds
     (edgeDirection: EdgeDirection): 
        VertexRDD[Array[VertexId]] = {
    val nbrs =
      if (edgeDirection == EdgeDirection.Either) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => { ctx.sendToSrc(Array(ctx.dstId)); 
            ctx.sendToDst(Array(ctx.srcId)) },
          _ ++ _, TripletFields.None)
      } else if (edgeDirection == EdgeDirection.Out) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => ctx.sendToSrc(Array(ctx.dstId)),
          _ ++ _, TripletFields.None)
      } else if (edgeDirection == EdgeDirection.In) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => ctx.sendToDst(Array(ctx.srcId)),
          _ ++ _, TripletFields.None)
      } else {
        throw new SparkException
("It doesn't make sense to collect neighbor ids without a " +
          "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
      }
    graph.vertices.leftZipJoin(nbrs) 
      { (vid, vdata, nbrsOpt) =>
      nbrsOpt.getOrElse(Array.empty[VertexId])
    }
  } // end of collectNeighborIds


//Example
graph.collectNeighborIds(EdgeDirection.Either)
.take(5).map(e=>e._1)

res10: Array[org.apache.spark.graphx.VertexId] = Array(4, 2, 1, 3)

method def collectNeighbors(edgeDirection: EdgeDirection)

  /**
   * Collect the neighbor vertex attributes 
      for each vertex.
   *
   * @note This function could be highly 
      inefficient on power-law
   * graphs where high degree vertices may 
      force a large amount of
   * information to be collected to a single 
      location.
   *
   * @param edgeDirection the direction 
      along which to collect
   * neighboring vertices
   *
   * @return the vertex set of neighboring 
      vertex attributes for each vertex
   */
  def collectNeighbors
    (edgeDirection: EdgeDirection): 
      VertexRDD[Array[(VertexId, VD)]] = {
    val nbrs = edgeDirection match {
      case EdgeDirection.Either =>
        graph.aggregateMessages
           [Array[(VertexId, VD)]](
          ctx => {
            ctx.sendToSrc
              (Array((ctx.dstId, ctx.dstAttr)))
            ctx.sendToDst
              (Array((ctx.srcId, ctx.srcAttr)))
          },
          (a, b) => a ++ b, TripletFields.All)
      case EdgeDirection.In =>
        graph.aggregateMessages
           [Array[(VertexId, VD)]](
          ctx => ctx.sendToDst
            (Array((ctx.srcId, ctx.srcAttr))),
          (a, b) => a ++ b, TripletFields.Src)
      case EdgeDirection.Out =>
        graph.aggregateMessages
           [Array[(VertexId, VD)]](
          ctx => ctx.sendToSrc
            (Array((ctx.dstId, ctx.dstAttr))),
          (a, b) => a ++ b, TripletFields.Dst)
      case EdgeDirection.Both =>
        throw new SparkException
("collectEdges does not support EdgeDirection.Both. Use" +
          "EdgeDirection.Either instead.")
    }
    graph.vertices.leftJoin(nbrs) 
       { (vid, vdata, nbrsOpt) =>
      nbrsOpt.getOrElse
        (Array.empty[(VertexId, VD)])
    }
  } // end of collectNeighbor

//Example

graph.collectNeighbors(EdgeDirection.Either)
  .take(5)
  
  
  /*
  res11: Array[(org.apache.spark.graphx.VertexId, 
    Array[(org.apache.spark.graphx.VertexId
      , (String, String))])] = Array
      ((4,Array((1,(jack,owner)))), (2,Array((1,(jack,owner)), (3,(mary,sales)))), (1,Array((2,(george,clerk)), (3,(mary,sales)), (4,(sherry,owner wife)))), (3,Array((1,(jack,owner)), (2,(george,clerk)))))

  */

method def aggregateMessages[Msg: ClassTag, A: ClassTag]

  /**
   * Aggregates values from the neighboring edges 
      and vertices of each vertex. The user-supplied
   * `sendMsg` function is invoked on each edge of 
     the graph, generating 0 or more messages to be
   * sent to either vertex in the edge. 
      The `mergeMsg` function is then used to 
      combine all messages
   * destined to the same vertex.
   *
   * @tparam A the type of message to be sent 
      to each vertex
   *
   * @param sendMsg runs on each edge, sending 
      messages to neighboring vertices using the
   *   [[EdgeContext]].
   * @param mergeMsg used to combine messages 
from `sendMsg` destined to the same vertex. This
   *   combiner should be commutative and 
     associative.
   * @param tripletFields which fields should be 
     included in the [[EdgeContext]] passed to the
   *   `sendMsg` function. If not all fields are 
   needed, specifying this can improve performance.
   *
   * @example We can use this function to 
      compute the in-degree of each
   * vertex
   * {{{
   * val rawGraph: Graph[_, _] = 
      Graph.textFile("twittergraph")
   * val inDeg: RDD[(VertexId, Int)] =
   *   rawGraph.aggregateMessages[Int]
      (ctx => ctx.sendToDst(1), _ + _)
   * }}}
   *
   * @note By expressing computation at the 
      edge level we achieve
   * maximum parallelism.  This is one of the 
      core functions in the
   * Graph API that enables neighborhood 
      level computation. For
   * example this function can be used to count 
      neighbors satisfying a
   * predicate or implement PageRank.
   *
   */
  def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: 
         TripletFields = TripletFields.All)
    : VertexRDD[A] = {
    aggregateMessagesWithActiveSet
       (sendMsg, mergeMsg, tripletFields, None)
  }

//Example

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("graphx")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()
val sc=spark.sparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
val defaultUser = ("", "Missing")
val graph = Graph(users, relationships, defaultUser)

// Equivalent to OutDegree count for each vertex
val degreeToDst:VertexRDD[Int]=
  graph.aggregateMessages(x=>x.sendToDst(1), (a,b)=>a+b)

// Equivalent to InDegree count for each vertex
val degreeToSrc:VertexRDD[Int]=
  graph.aggregateMessages(x=>x.sendToSrc(1), (a,b)=>a+b)

Computing Degree Information (These are attributes of Graph class)

 val inDegrees: VertexRDD[Int]
 val outDegrees: VertexRDD[Int]
 val degrees: VertexRDD[Int]
 
//example

graph.inDegrees.collect
graph.outDegrees.collect
graph.degrees.collect

Implementation

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Caching and Uncaching

// Functions for caching graphs 
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  
def cache(): Graph[VD, ED]
  
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

method persist

    /**
   * Mark this RDD for persisting using the specified level.
   *
   * @param newLevel the target storage level
   * @param allowOverride whether to override any existing level with the new one
   */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

  /**
   * Set this RDD's storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet. Local checkpointing is an exception.
   */
  def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

//Example
graph.persist()
graph.unpersist()
graph.unpersistVertices(true)

method cache()

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

//Example

graph.cache()

Pregel API

  // Iterative graph-parallel computation 
def pregel[A: ClassTag](initialMsg: A, 
  maxIterations: Int, activeDirection: EdgeDirection)
   (
     vprog: (VertexId, VD, A) => VD,
     sendMsg: EdgeTriplet[VD, ED] => 
          Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A
    )
: Graph[VD, ED]

detailed implementation:

   *
   * @tparam A the Pregel message type
   *
   * @param initialMsg the message each vertex will receive at the on
   * the first iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

Included Graph algorithm:

def pageRank(tol: Double, resetProb: Double = 0.15):
    Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): 
    Graph[VertexId, ED]
}

method pageRank:

  /**
   * Run a dynamic version of PageRank returning a 
    graph with vertex attributes containing the
   * PageRank and edge attributes containing the 
      normalized edge weight.
   *
   * @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergence]]
   */
def pageRank(tol: Double, resetProb: Double = 0.15):
Graph[Double, Double] = 
{
PageRank.runUntilConvergence(graph, tol, resetProb)
}

Detail implementation see:

https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala

method connectedComponents

 /**
   * Compute the connected component membership 
     of each vertex and return a graph with 
     the vertex
   * value containing the lowest vertex id in the 
   connected component containing that vertex.
   *
   * @see `org.apache.spark.graphx.lib
     .ConnectedComponents.run`
   */
  def connectedComponents(): Graph[VertexId, ED] = 
  {
    ConnectedComponents.run(graph)
  }

  /**
   * Compute the connected component membership of 
    each vertex and return a graph with the vertex
   * value containing the lowest vertex id in the 
    connected component containing that vertex.
   *
   * @see `org.apache.spark.graphx.lib.ConnectedComponents.run`
   */
  def connectedComponents(maxIterations: Int): 
    Graph[VertexId, ED] = {
    ConnectedComponents.run(graph, maxIterations)
  }

method triangleCount

  /**
   * Compute the number of triangles passing 
   through each vertex.
   *
   * @see [[org.apache.spark.graphx.lib.TriangleCount$#run]]
   */
  def triangleCount(): Graph[Int, ED] = {
    TriangleCount.run(graph)
  }

method stronglyConnectedComponents

  /**
   * Compute the strongly connected component (SCC) 
     of each vertex and return a graph with the
   * vertex value containing the lowest vertex id 
     in the SCC containing that vertex.
   *
   * @see [[org.apache.spark.graphx.lib
     .StronglyConnectedComponents$#run]]
   */
  def stronglyConnectedComponents(numIter: Int): 
  Graph[VertexId, ED] = {
    StronglyConnectedComponents.run(graph, numIter)
  }

Example:

//build graph
val followerGraph = GraphLoader
.edgeListFile
(sc, "file:///opt/spark/spark/data/graphx/followers.txt")

//pageRank, sorted by pageRank in descending order
followerGraph.pageRank(0.001)
   .vertices.sortBy(-_._2)
   .collect.foreach(println)
   
/*
(1,2.435707093904704)
(2,2.3600348087805516)
(3,0.6140453120556082)
(6,0.5045131359115431)
(7,0.5045131359115431)
(4,0.2905932567180246)
(5,0.2905932567180246)
*/

//connectedComponents
followerGraph.connectedComponents
   .vertices.foreach(println)
   
/*
(4,1)
(6,1)
(2,1)
(1,1)
(3,1)
(7,1)
(5,1)
*/

//triangleCount

followerGraph.triangleCount
   .vertices.foreach(println)
   
/*
(1,0)
(3,1)
(7,1)
(5,0)
(4,0)
(6,1)
(2,0)
*/

followerGraph.stronglyConnectedComponents(3)
   .vertices.foreach(println)
   
/*
(4,4)
(1,1)
(3,3)
(7,3)
(5,5)
(6,3)
(2,1)
*/

Pregel -- Iterative Graph Level Computation

def pregel
[A: ClassTag](initialMsg: A, 
maxIterations: Int, activeDirection: EdgeDirection)
(
vprog: (VertexId, VD, A) => VD,

sendMsg: EdgeTriplet[VD, ED] => 
Iterator[(VertexId,A)],

mergeMsg: (A, A) => A)
: Graph[VD, ED]

Parameters:

graph - the input graph.

initialMsg - the message each vertex will receive at the first iteration

maxIterations - the maximum number of iterations to run for

activeDirection - the direction of edges incident to a vertex that received a message in the previous round on which to run sendMsg. For example, if this is EdgeDirection.Out, only out-edges of vertices that received a message in the previous round will run. The default is EdgeDirection.Either, which will run sendMsg on edges where either side received a message in the previous round. If this is EdgeDirection.Both, sendMsg will only run on edges where both vertices received a message.

vprog - the user-defined vertex program which runs on each vertex and receives the inbound message and computes a new vertex value. On the first iteration the vertex program is invoked on all vertices and is passed the default message. On subsequent iterations the vertex program is only invoked on those vertices that receive messages.

sendMsg - a user supplied function that is applied to out edges of vertices that received messages in the current iteration

mergeMsg - a user supplied function that takes two incoming messages of type A and merges them into a single message of type A. "This function must be commutative and associative and ideally the size of A should not increase."

Example:

Pregel operator to express computation such as single source shortest path

val sourceId: VertexId = 3 // The ultimate source
// Initialize the graph such that all vertices 
// except the root have distance infinity.
val initialGraph = 
  followerGraph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 
    else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => 
    math.min(dist, newDist), // Vertex Program
  
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

/*
Shortest distance from VertexId 3:
(4,1.0)
(6,1.0)
(2,1.0)
(1,2.0)
(3,0.0)
(7,1.0)
(5,1.0)

*/

Last updated