Spark Graph Computing Continue

At present, there are many graph-based parallel computing frameworks, such as

Pregel from Google

Giraph / HAMA from Apache open source graph computing framework and the GraphLab.

Among them, Pregel, HAMA and Giraph are very similar and are based on BSP ( Bulk Synchronous Parallell) mode.

A BSP computer consists of

components capable of processing and/or local memory transactions (i.e., processors), a network that routes messages between pairs of such components, and a hardware facility that allows for the synchronization of all or a subset of components.

This is commonly interpreted as a set of processors which may follow different threads of computation, with each processor equipped with fast local memory and interconnected by a communication network. A BSP algorithm relies heavily on the third feature; a computation proceeds in a series of global super steps, which consists of three components:

Concurrent computation: every participating processor may perform local computations, i.e., each process can only make use of values stored in the local fast memory of the processor. The computations occur asynchronously of all the others but may overlap with communication.

Communication: The processes exchange data between themselves to facilitate remote data storage capabilities.

Barrier synchronization: When a process reaches this point (the barrier), it waits until all other processes have reached the same barrier.

Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:

For example, we can calculate the in-degree of each vertex (defined in GraphOps) in the following way.

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

Following is the Graph operations defined in abstract class Graph, that has all the class attributes and class methods.

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.VertexRDDImpl
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}

abstract class Graph[VD: ClassTag, ED: ClassTag] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = 
    StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): 
    Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: 
     PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): 
     Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): 
     Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, 
    Iterator[Edge[ED]]) => Iterator[ED2]): 
      Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => 
    ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: 
    (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => 
      Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => 
        Boolean = (x => true),
      vpred: (VertexId, VD) => 
        Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph1[VD2, ED2]): 
    Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): 
    Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U]
  (table: RDD[(VertexId, U)])
  (mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2]
    (other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about 
  // adjacent triplets =================================================
  def collectNeighborIds
    (edgeDirection: EdgeDirection): 
      VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: 
    EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag, A: ClassTag]
    (
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
  tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A: ClassTag]
    (initialMsg: A, 
      maxIterations: Int, 
        activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => 
      Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank
  (tol: Double, resetProb: Double = 0.15): 
    Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents
    (numIter: Int): Graph[VertexId, ED]
}

The above code is to define Graph abstract class, where its methods are not yet defined. The definitions of methods will be illustrated later on here.

There are following operations you can perform on a graph:

Operations on property (property operator)

method mapVertices[VD2]

method mapEdges[ED2]

method mapTriplets[ED2]

Operations on structure (structural operator)

method reverse

method subgraph

method mask[VD2,ED2]

method groupEdges

Join operation (Join operator)

method joinVertices

method outerJoinVertices[U: ClassTag, VD2: ClassTag]

Neighborhood Aggregation

method collectNeighborIds(edgeDirection: EdgeDirection)

method def collectNeighbors(edgeDirection: EdgeDirection)

method def aggregateMessages[Msg: ClassTag, A: ClassTag]

Computing Degree Information (These are attributes of Graph class)

Implementation

Caching and Uncaching

method persist

method cache()

Pregel API

detailed implementation:

Included Graph algorithm:

method pageRank:

Detail implementation see:

https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala

method connectedComponents

method triangleCount

method stronglyConnectedComponents

Example:

Pregel -- Iterative Graph Level Computation

Parameters:

graph - the input graph.

initialMsg - the message each vertex will receive at the first iteration

maxIterations - the maximum number of iterations to run for

activeDirection - the direction of edges incident to a vertex that received a message in the previous round on which to run sendMsg. For example, if this is EdgeDirection.Out, only out-edges of vertices that received a message in the previous round will run. The default is EdgeDirection.Either, which will run sendMsg on edges where either side received a message in the previous round. If this is EdgeDirection.Both, sendMsg will only run on edges where both vertices received a message.

vprog - the user-defined vertex program which runs on each vertex and receives the inbound message and computes a new vertex value. On the first iteration the vertex program is invoked on all vertices and is passed the default message. On subsequent iterations the vertex program is only invoked on those vertices that receive messages.

sendMsg - a user supplied function that is applied to out edges of vertices that received messages in the current iteration

mergeMsg - a user supplied function that takes two incoming messages of type A and merges them into a single message of type A. "This function must be commutative and associative and ideally the size of A should not increase."

Example:

Pregel operator to express computation such as single source shortest path

Last updated

Was this helpful?