Spark Graph Computing Continue
At present, there are many graph-based parallel computing frameworks, such as
Pregel from Google
Giraph / HAMA from Apache open source graph computing framework and the GraphLab.
Among them, Pregel, HAMA and Giraph are very similar and are based on BSP ( Bulk Synchronous Parallell) mode.
A BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors), a network that routes messages between pairs of such components, and a hardware facility that allows for the synchronization of all or a subset of components.
This is commonly interpreted as a set of processors which may follow different threads of computation, with each processor equipped with fast local memory and interconnected by a communication network. A BSP algorithm relies heavily on the third feature; a computation proceeds in a series of global super steps, which consists of three components:
Concurrent computation: every participating processor may perform local computations, i.e., each process can only make use of values stored in the local fast memory of the processor. The computations occur asynchronously of all the others but may overlap with communication.
Communication: The processes exchange data between themselves to facilitate remote data storage capabilities.
Barrier synchronization: When a process reaches this point (the barrier), it waits until all other processes have reached the same barrier.
Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:
For example, we can calculate the in-degree of each vertex (defined in GraphOps) in the following way.
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegreesFollowing is the Graph operations defined in abstract class Graph, that has all the class attributes and class methods.
import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.VertexRDDImpl
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}
abstract class Graph[VD: ClassTag, ED: ClassTag] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel =
StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true):
Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy:
PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2):
Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2):
Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID,
Iterator[Edge[ED]]) => Iterator[ED2]):
Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] =>
ED2): Graph[VD, ED2]
def mapTriplets[ED2](map:
(PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] =>
Boolean = (x => true),
vpred: (VertexId, VD) =>
Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph1[VD2, ED2]):
Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED):
Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U]
(table: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2]
(other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about
// adjacent triplets =================================================
def collectNeighborIds
(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection:
EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag, A: ClassTag]
(
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A: ClassTag]
(initialMsg: A,
maxIterations: Int,
activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] =>
Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank
(tol: Double, resetProb: Double = 0.15):
Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents
(numIter: Int): Graph[VertexId, ED]
}The above code is to define Graph abstract class, where its methods are not yet defined. The definitions of methods will be illustrated later on here.
There are following operations you can perform on a graph:
Operations on property (property operator)
method mapVertices[VD2]
method mapEdges[ED2]
method mapTriplets[ED2]
Operations on structure (structural operator)
method reverse
method subgraph
method mask[VD2,ED2]
method groupEdges
Join operation (Join operator)
method joinVertices
method outerJoinVertices[U: ClassTag, VD2: ClassTag]
Neighborhood Aggregation
method collectNeighborIds(edgeDirection: EdgeDirection)
method def collectNeighbors(edgeDirection: EdgeDirection)
method def aggregateMessages[Msg: ClassTag, A: ClassTag]
Computing Degree Information (These are attributes of Graph class)
Implementation
Caching and Uncaching
method persist
method cache()
Pregel API
detailed implementation:
Included Graph algorithm:
method pageRank:
Detail implementation see:
method connectedComponents
method triangleCount
method stronglyConnectedComponents
Example:
Pregel -- Iterative Graph Level Computation
Parameters:
graph - the input graph.
initialMsg - the message each vertex will receive at the first iteration
maxIterations - the maximum number of iterations to run for
activeDirection - the direction of edges incident to a vertex that received a message in the previous round on which to run sendMsg. For example, if this is EdgeDirection.Out, only out-edges of vertices that received a message in the previous round will run. The default is EdgeDirection.Either, which will run sendMsg on edges where either side received a message in the previous round. If this is EdgeDirection.Both, sendMsg will only run on edges where both vertices received a message.
vprog - the user-defined vertex program which runs on each vertex and receives the inbound message and computes a new vertex value. On the first iteration the vertex program is invoked on all vertices and is passed the default message. On subsequent iterations the vertex program is only invoked on those vertices that receive messages.
sendMsg - a user supplied function that is applied to out edges of vertices that received messages in the current iteration
mergeMsg - a user supplied function that takes two incoming messages of type A and merges them into a single message of type A. "This function must be commutative and associative and ideally the size of A should not increase."
Example:
Pregel operator to express computation such as single source shortest path
Last updated
Was this helpful?