Spark Graph Computing Continue
At present, there are many graph-based parallel computing frameworks, such as
Pregel from Google
Giraph / HAMA from Apache open source graph computing framework and the GraphLab.
Among them, Pregel, HAMA and Giraph are very similar and are based on BSP ( Bulk Synchronous Parallell) mode.
A BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors), a network that routes messages between pairs of such components, and a hardware facility that allows for the synchronization of all or a subset of components.
This is commonly interpreted as a set of processors which may follow different threads of computation, with each processor equipped with fast local memory and interconnected by a communication network. A BSP algorithm relies heavily on the third feature; a computation proceeds in a series of global super steps, which consists of three components:
Concurrent computation: every participating processor may perform local computations, i.e., each process can only make use of values stored in the local fast memory of the processor. The computations occur asynchronously of all the others but may overlap with communication.
Communication: The processes exchange data between themselves to facilitate remote data storage capabilities.
Barrier synchronization: When a process reaches this point (the barrier), it waits until all other processes have reached the same barrier.
Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:
For example, we can calculate the in-degree of each vertex (defined in GraphOps) in the following way.
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
Following is the Graph operations defined in abstract class Graph, that has all the class attributes and class methods.
import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.VertexRDDImpl
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}
abstract class Graph[VD: ClassTag, ED: ClassTag] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel =
StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true):
Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy:
PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2):
Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2):
Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID,
Iterator[Edge[ED]]) => Iterator[ED2]):
Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] =>
ED2): Graph[VD, ED2]
def mapTriplets[ED2](map:
(PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] =>
Boolean = (x => true),
vpred: (VertexId, VD) =>
Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph1[VD2, ED2]):
Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED):
Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U]
(table: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2]
(other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about
// adjacent triplets =================================================
def collectNeighborIds
(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection:
EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag, A: ClassTag]
(
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A: ClassTag]
(initialMsg: A,
maxIterations: Int,
activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] =>
Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank
(tol: Double, resetProb: Double = 0.15):
Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents
(numIter: Int): Graph[VertexId, ED]
}
The above code is to define Graph abstract class, where its methods are not yet defined. The definitions of methods will be illustrated later on here.
There are following operations you can perform on a graph:
Operations on property (property operator)
abstract class Graph[VD: ClassTag, ED: ClassTag] {
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2):
Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2):
Graph[VD, ED2]
def mapEdges[ED2]
(map: (PartitionID, Iterator[Edge[ED]]) =>
Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2]
(map: EdgeTriplet[VD, ED] => ED2):
Graph[VD, ED2]
def mapTriplets[ED2]
(map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
}
method mapVertices[VD2]
/**
* Transforms each vertex attribute in the graph
using the map function.
*
* @note The new graph has the same structure.
As a consequence the underlying index structures
* can be reused.
*
* @param map the function from a vertex object
to a new vertex value
*
* @tparam VD2 the new vertex data type
*
* @example We might use this operation to
change the vertex values
* from one type to another to initialize an
algorithm.
* {{{
* val rawGraph: Graph[(), ()] =
Graph.textFile("hdfs://file")
* val root = 42
* var bfsGraph = rawGraph.mapVertices[Int]
((vid, data) => if (vid == root) 0
else Math.MaxValue)
* }}}
*
*/
def mapVertices[VD2: ClassTag]
(map: (VertexId, VD) => VD2): Graph[VD2, ED]
//Example:
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 12)
.mapVertices( (id, _) => id.toDouble )
method mapEdges[ED2]
/**
* Transforms each edge attribute in the graph
using the map function. The map function is
not
* passed the vertex value for the vertices
adjacent to the edge. If vertex values are
desired,
* use `mapTriplets`.
*
* @note This graph is not changed and that the
new graph has the
* same structure. As a consequence the
underlying index structures
* can be reused.
*
* @param map the function from an edge object
to a new edge value.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to
initialize edge
* attributes.
*
*/
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2):
Graph[VD, ED2] = {
mapEdges((pid, iter) => iter.map(map))
}
//Example
val edgeTo2 = graph.mapEdges(e => 2)
edgeTo2.edges.take(5).foreach(println)
/*
Edge(0,1,2)
Edge(0,1,2)
Edge(0,3,2)
Edge(0,3,2)
Edge(0,5,2)
*/
/**
* Transforms each edge attribute using the map
function, passing it a whole partition at a
* time. The map function is given an iterator
over edges within a logical partition as well as
* the partition's ID, and it should return a
new iterator over the new values of each edge.
The
* new iterator's elements must correspond
one-to-one with the old iterator's elements. If
* adjacent vertex values are desired,
use `mapTriplets`.
*
* @note This does not change the structure
of the
* graph or modify the values of this graph.
As a consequence
* the underlying index structures can be reused.
*
* @param map a function that takes a
partition id and an iterator
* over all the edges in the partition, and
must return an iterator over
* the new values for each edge in the
order of the input iterator
*
* @tparam ED2 the new edge data type
*
*/
def mapEdges[ED2: ClassTag]
(map: (PartitionID, Iterator[Edge[ED]]) =>
Iterator[ED2])
: Graph[VD, ED2]
method mapTriplets[ED2]
/**
* Transforms each edge attribute using the
map function, passing it the adjacent vertex
* attributes as well. If adjacent vertex
values are not required,
* consider using `mapEdges` instead.
*
* @note This does not change the structure
of the
* graph or modify the values of this graph.
As a consequence
* the underlying index structures can be reused.
*
* @param map the function from an edge object
to a new edge value.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to
initialize edge
* attributes based on the attributes associated
with each vertex.
* {{{
* val rawGraph: Graph[Int, Int] =
someLoadFunction()
* val graph = rawGraph.mapTriplets[Int]
( edge =>
* edge.src.data - edge.dst.data)
* }}}
*
*/
def mapTriplets[ED2: ClassTag]
(map: EdgeTriplet[VD, ED] => ED2):
Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map),
TripletFields.All)
}
//Example:
graph.mapTriplets(e=>e.attr*10)
/**
* Transforms each edge attribute using the map
function, passing it the adjacent vertex
* attributes as well. If adjacent vertex values
are not required,
* consider using `mapEdges` instead.
*
* @note This does not change the structure of
the
* graph or modify the values of this graph.
As a consequence
* the underlying index structures can be reused.
*
* @param map the function from an edge object
to a new edge value.
* @param tripletFields which fields should be
included in the edge triplet passed to the map
* function. If not all fields are needed,
specifying this can improve performance.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to
initialize edge
* attributes based on the attributes associated
with each vertex.
* {{{
* val rawGraph: Graph[Int, Int] =
someLoadFunction()
* val graph = rawGraph.mapTriplets[Int]
( edge =>
* edge.src.data - edge.dst.data)
* }}}
*
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2,
tripletFields: TripletFields):
Graph[VD, ED2] = {
mapTriplets((pid, iter) =>
iter.map(map), tripletFields)
}
/**
* Transforms each edge attribute a partition
at a time using the map function, passing it the
* adjacent vertex attributes as well. The map
function is given an iterator over edge triplets
* within a logical partition and should yield
a new iterator over the new values of each edge in
* the order in which they are provided.
If adjacent vertex values are not required,
consider
* using `mapEdges` instead.
*
* @note This does not change the structure of
the
* graph or modify the values of this graph.
As a consequence
* the underlying index structures can be reused.
*
* @param map the iterator transform
* @param tripletFields which fields should be
included in the edge triplet passed to the map
* function. If not all fields are needed,
specifying this can improve performance.
*
* @tparam ED2 the new edge data type
*
*/
def mapTriplets[ED2: ClassTag](
map:
(PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
Iterator[ED2],
tripletFields: TripletFields):
Graph[VD, ED2]
Operations on structure (structural operator)
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred:
EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred:
(VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph1[VD2, ED2]):
Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED):
Graph[VD, ED]
method reverse
/**
* Reverses all edges in the graph. If this
graph contains an edge from a to b then the
returned
* graph contains an edge from b to a.
*/
def reverse: Graph[VD, ED]
// example:
graph.edges.collect.foreach(println)
/*
Edge(0,1,1)
Edge(0,1,1)
Edge(0,4,1)
Edge(1,1,1)
Edge(1,3,1)
Edge(2,2,1)
Edge(2,3,1)
Edge(2,3,1)
Edge(3,0,1)
Edge(3,0,1)
Edge(3,2,1)
Edge(4,0,1)
Edge(4,0,1)
Edge(4,1,1)
Edge(4,2,1)
*/
graph.reverse.edges.collect.foreach(println)
/*
Edge(1,0,1)
Edge(1,0,1)
Edge(1,1,1)
Edge(3,1,1)
Edge(4,0,1)
Edge(0,3,1)
Edge(0,3,1)
Edge(0,4,1)
Edge(0,4,1)
Edge(1,4,1)
Edge(2,2,1)
Edge(2,3,1)
Edge(2,4,1)
Edge(3,2,1)
Edge(3,2,1)
​
*/
method subgraph
/**
* Restricts the graph to only the vertices and
edges satisfying the predicates. The resulting
* subgraph satisfies
*
* {{{
* V' = {v : for all v in V where vpred(v)}
* E' = {(u,v): for all (u,v) in E
where epred((u,v)) && vpred(u) && vpred(v)}
* }}}
*
* @param epred the edge predicate, which takes
a triplet and
* evaluates to true if the edge is to remain
in the subgraph. Note
* that only edges where both vertices satisfy
the vertex
* predicate are considered.
*
* @param vpred the vertex predicate, which
takes a vertex object and
* evaluates to true if the vertex is to be
included in the subgraph
*
* @return the subgraph containing only the
vertices and edges that
* satisfy the predicates
*/
def subgraph(
epred:
EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred:
(VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
//Example:
graph.subgraph(e=>e.srcId==3,(vid,_)=>vid==0)
.vertices.foreach(println)
/*
(0,3)
*/
method mask[VD2,ED2]
/**
* Restricts the graph to only the vertices and
edges that are also in `other`, but keeps the
* attributes from this graph.
* @param other the graph to project this graph
onto
* @return a graph with vertices and edges that
exist in both the current graph and `other`,
* with vertex and edge data from the current
graph
*/
def mask[VD2: ClassTag, ED2: ClassTag]
(other: Graph[VD2, ED2]): Graph[VD, ED]
//Example
graph.mask(graph).vertices.collect.foreach(println)
/*
(4,4)
(0,3)
(2,3)
(1,2)
(3,3)
*/
method groupEdges
/**
* Merges multiple edges between two vertices
into a single edge. For correct results, the
graph
* must have been partitioned using
`partitionBy`.
*
* @param merge the user-supplied commutative
associative function to merge edge attributes
* for duplicate edges.
*
* @return The resulting graph with a single
edge for each (source, dest) vertex pair.
*/
def groupEdges(merge: (ED, ED) => ED):
Graph[VD, ED]
//Example, like reduce op
graph.groupEdges(_+_).edges.collect.foreach(println)
Join operation (Join operator)
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, U) => VD):
Graph[VD, ED]
def outerJoinVertices[U, VD2]
(other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]u
method joinVertices
/**
* Join the vertices with an RDD and then apply
a function from the
* vertex and RDD entry to a new vertex value.
The input table
* should contain at most one entry for each
vertex. If no entry is
* provided the map function is skipped and the
old value is used.
*
* @tparam U the type of entry in the table
of updates
* @param table the table to join with the
vertices in the graph.
* The table should contain at most one entry
for each vertex.
* @param mapFunc the function used to compute
the new vertex
* values. The map function is invoked only
for vertices with a
* corresponding entry in the table otherwise
the old vertex value
* is used.
*
* @example This function is used to update the
vertices with new
* values based on external data. For example
we could add the out
* degree to each vertex record
*
* {{{
* val rawGraph: Graph[Int, Int] =
GraphLoader.edgeListFile(sc, "webgraph")
* .mapVertices((_, _) => 0)
* val outDeg = rawGraph.outDegrees
* val graph = rawGraph.joinVertices[Int](outDeg)
* ((_, _, outDeg) => outDeg)
* }}}
*
*/
def joinVertices[U: ClassTag]
(table: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U])
=> {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
graph.outerJoinVertices(table)(uf)
}
//example
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("graphx")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()
val sc=spark.sparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
val defaultUser = ("", "Missing")
val graph = Graph(users, relationships, defaultUser)
val userWithAge: RDD[(VertexId, Int)] =
sc.parallelize(Array(
(1L, 25), (2L, 30), (3L, 23), (4L, 30)
))
graph.joinVertices(userWithAge) { (id, prop, age) => {
(prop._1 + "", prop._2 + " Age is " + age.toString)
}}.vertices.collect.foreach(println)
/*
(4,(sherry,owner wife Age is 30))
(2,(george,clerk Age is 30))
(1,(jack,owner Age is 25))
(3,(mary,sales Age is 23))
​
*/
method outerJoinVertices[U: ClassTag, VD2: ClassTag]
/**
* Joins the vertices with entries in the `table`
RDD and merges the results using `mapFunc`.
* The input table should contain at most one
entry for each vertex. If no entry in `other`
is
* provided for a particular vertex in the
graph, the map function receives `None`.
*
* @tparam U the type of entry in the table of
updates
* @tparam VD2 the new vertex value type
*
* @param other the table to join with the
vertices in the graph.
* The table should contain at
most one entry for each vertex.
* @param mapFunc the function used to compute
the new vertex values.
* The map function is invoked
for all vertices, even those
* that do not have a
corresponding entry in the table.
*
* @example This function is used to update
the vertices with new values based on external
data.
* For example we could add the
out-degree to each vertex record:
*
* {{{
* val rawGraph: Graph[_, _] =
Graph.textFile("webgraph")
* val outDeg: RDD[(VertexId, Int)] =
rawGraph.outDegrees
* val graph =
rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*/
def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null)
: Graph[VD2, ED]
//Example:
graph.outerJoinVertices(userWithAge) { (id, prop, age) =>
age match {
case Some(age) => (prop._1 + "", prop._2 + " Age is " + age.toString)
case None => "0"
}
}.vertices.collect.foreach(println)
/*
(4,(sherry,owner wife Age is 30))
(2,(george,clerk Age is 30))
(1,(jack,owner Age is 25))
(3,(mary,sales Age is 23))
*/
Neighborhood Aggregation
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds
(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]]
def collectNeighbors
(edgeDirection: EdgeDirection):
VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag, A: ClassTag]
(
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields =
TripletFields.All)
: VertexRDD[A]
method collectNeighborIds(edgeDirection: EdgeDirection)
/**
* Collect the neighbor vertex ids for each
vertex.
*
* @param edgeDirection the direction along
which to collect
* neighboring vertices
*
* @return the set of neighboring ids for
each vertex
*/
def collectNeighborIds
(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
graph.aggregateMessages[Array[VertexId]](
ctx => { ctx.sendToSrc(Array(ctx.dstId));
ctx.sendToDst(Array(ctx.srcId)) },
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToSrc(Array(ctx.dstId)),
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToDst(Array(ctx.srcId)),
_ ++ _, TripletFields.None)
} else {
throw new SparkException
("It doesn't make sense to collect neighbor ids without a " +
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs)
{ (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[VertexId])
}
} // end of collectNeighborIds
//Example
graph.collectNeighborIds(EdgeDirection.Either)
.take(5).map(e=>e._1)
res10: Array[org.apache.spark.graphx.VertexId] = Array(4, 2, 1, 3)
method def collectNeighbors(edgeDirection: EdgeDirection)
/**
* Collect the neighbor vertex attributes
for each vertex.
*
* @note This function could be highly
inefficient on power-law
* graphs where high degree vertices may
force a large amount of
* information to be collected to a single
location.
*
* @param edgeDirection the direction
along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring
vertex attributes for each vertex
*/
def collectNeighbors
(edgeDirection: EdgeDirection):
VertexRDD[Array[(VertexId, VD)]] = {
val nbrs = edgeDirection match {
case EdgeDirection.Either =>
graph.aggregateMessages
[Array[(VertexId, VD)]](
ctx => {
ctx.sendToSrc
(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst
(Array((ctx.srcId, ctx.srcAttr)))
},
(a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
graph.aggregateMessages
[Array[(VertexId, VD)]](
ctx => ctx.sendToDst
(Array((ctx.srcId, ctx.srcAttr))),
(a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
graph.aggregateMessages
[Array[(VertexId, VD)]](
ctx => ctx.sendToSrc
(Array((ctx.dstId, ctx.dstAttr))),
(a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
throw new SparkException
("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
}
graph.vertices.leftJoin(nbrs)
{ (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse
(Array.empty[(VertexId, VD)])
}
} // end of collectNeighbor
//Example
graph.collectNeighbors(EdgeDirection.Either)
.take(5)
/*
res11: Array[(org.apache.spark.graphx.VertexId,
Array[(org.apache.spark.graphx.VertexId
, (String, String))])] = Array
((4,Array((1,(jack,owner)))), (2,Array((1,(jack,owner)), (3,(mary,sales)))), (1,Array((2,(george,clerk)), (3,(mary,sales)), (4,(sherry,owner wife)))), (3,Array((1,(jack,owner)), (2,(george,clerk)))))
*/
method def aggregateMessages[Msg: ClassTag, A: ClassTag]
/**
* Aggregates values from the neighboring edges
and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of
the graph, generating 0 or more messages to be
* sent to either vertex in the edge.
The `mergeMsg` function is then used to
combine all messages
* destined to the same vertex.
*
* @tparam A the type of message to be sent
to each vertex
*
* @param sendMsg runs on each edge, sending
messages to neighboring vertices using the
* [[EdgeContext]].
* @param mergeMsg used to combine messages
from `sendMsg` destined to the same vertex. This
* combiner should be commutative and
associative.
* @param tripletFields which fields should be
included in the [[EdgeContext]] passed to the
* `sendMsg` function. If not all fields are
needed, specifying this can improve performance.
*
* @example We can use this function to
compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[_, _] =
Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
* rawGraph.aggregateMessages[Int]
(ctx => ctx.sendToDst(1), _ + _)
* }}}
*
* @note By expressing computation at the
edge level we achieve
* maximum parallelism. This is one of the
core functions in the
* Graph API that enables neighborhood
level computation. For
* example this function can be used to count
neighbors satisfying a
* predicate or implement PageRank.
*
*/
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields:
TripletFields = TripletFields.All)
: VertexRDD[A] = {
aggregateMessagesWithActiveSet
(sendMsg, mergeMsg, tripletFields, None)
}
//Example
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.log4j._
import org.apache.spark.sql._
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("graphx")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()
val sc=spark.sparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
val defaultUser = ("", "Missing")
val graph = Graph(users, relationships, defaultUser)
// Equivalent to OutDegree count for each vertex
val degreeToDst:VertexRDD[Int]=
graph.aggregateMessages(x=>x.sendToDst(1), (a,b)=>a+b)
// Equivalent to InDegree count for each vertex
val degreeToSrc:VertexRDD[Int]=
graph.aggregateMessages(x=>x.sendToSrc(1), (a,b)=>a+b)
Computing Degree Information (These are attributes of Graph class)
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
//example
graph.inDegrees.collect
graph.outDegrees.collect
graph.degrees.collect
Implementation
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
Caching and Uncaching
// Functions for caching graphs
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
method persist
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
//Example
graph.persist()
graph.unpersist()
graph.unpersistVertices(true)
method cache()
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
//Example
graph.cache()
Pregel API
// Iterative graph-parallel computation
def pregel[A: ClassTag](initialMsg: A,
maxIterations: Int, activeDirection: EdgeDirection)
(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] =>
Iterator[(VertexId,A)],
mergeMsg: (A, A) => A
)
: Graph[VD, ED]
detailed implementation:
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
Included Graph algorithm:
def pageRank(tol: Double, resetProb: Double = 0.15):
Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int):
Graph[VertexId, ED]
}
method pageRank:
/**
* Run a dynamic version of PageRank returning a
graph with vertex attributes containing the
* PageRank and edge attributes containing the
normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergence]]
*/
def pageRank(tol: Double, resetProb: Double = 0.15):
Graph[Double, Double] =
{
PageRank.runUntilConvergence(graph, tol, resetProb)
}
Detail implementation see:
method connectedComponents
/**
* Compute the connected component membership
of each vertex and return a graph with
the vertex
* value containing the lowest vertex id in the
connected component containing that vertex.
*
* @see `org.apache.spark.graphx.lib
.ConnectedComponents.run`
*/
def connectedComponents(): Graph[VertexId, ED] =
{
ConnectedComponents.run(graph)
}
/**
* Compute the connected component membership of
each vertex and return a graph with the vertex
* value containing the lowest vertex id in the
connected component containing that vertex.
*
* @see `org.apache.spark.graphx.lib.ConnectedComponents.run`
*/
def connectedComponents(maxIterations: Int):
Graph[VertexId, ED] = {
ConnectedComponents.run(graph, maxIterations)
}
method triangleCount
/**
* Compute the number of triangles passing
through each vertex.
*
* @see [[org.apache.spark.graphx.lib.TriangleCount$#run]]
*/
def triangleCount(): Graph[Int, ED] = {
TriangleCount.run(graph)
}
method stronglyConnectedComponents
/**
* Compute the strongly connected component (SCC)
of each vertex and return a graph with the
* vertex value containing the lowest vertex id
in the SCC containing that vertex.
*
* @see [[org.apache.spark.graphx.lib
.StronglyConnectedComponents$#run]]
*/
def stronglyConnectedComponents(numIter: Int):
Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}
Example:
//build graph
val followerGraph = GraphLoader
.edgeListFile
(sc, "file:///opt/spark/spark/data/graphx/followers.txt")
//pageRank, sorted by pageRank in descending order
followerGraph.pageRank(0.001)
.vertices.sortBy(-_._2)
.collect.foreach(println)
/*
(1,2.435707093904704)
(2,2.3600348087805516)
(3,0.6140453120556082)
(6,0.5045131359115431)
(7,0.5045131359115431)
(4,0.2905932567180246)
(5,0.2905932567180246)
*/
//connectedComponents
followerGraph.connectedComponents
.vertices.foreach(println)
/*
(4,1)
(6,1)
(2,1)
(1,1)
(3,1)
(7,1)
(5,1)
*/
//triangleCount
followerGraph.triangleCount
.vertices.foreach(println)
/*
(1,0)
(3,1)
(7,1)
(5,0)
(4,0)
(6,1)
(2,0)
*/
followerGraph.stronglyConnectedComponents(3)
.vertices.foreach(println)
/*
(4,4)
(1,1)
(3,3)
(7,3)
(5,5)
(6,3)
(2,1)
*/
Pregel -- Iterative Graph Level Computation
def pregel
[A: ClassTag](initialMsg: A,
maxIterations: Int, activeDirection: EdgeDirection)
(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] =>
Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
Parameters:
graph - the input graph.
initialMsg - the message each vertex will receive at the first iteration
maxIterations - the maximum number of iterations to run for
activeDirection - the direction of edges incident to a vertex that received a message in the previous round on which to run sendMsg. For example, if this is EdgeDirection.Out, only out-edges of vertices that received a message in the previous round will run. The default is EdgeDirection.Either, which will run sendMsg on edges where either side received a message in the previous round. If this is EdgeDirection.Both, sendMsg will only run on edges where both vertices received a message.
vprog - the user-defined vertex program which runs on each vertex and receives the inbound message and computes a new vertex value. On the first iteration the vertex program is invoked on all vertices and is passed the default message. On subsequent iterations the vertex program is only invoked on those vertices that receive messages.
sendMsg - a user supplied function that is applied to out edges of vertices that received messages in the current iteration
mergeMsg - a user supplied function that takes two incoming messages of type A and merges them into a single message of type A. "This function must be commutative and associative and ideally the size of A should not increase."
Example:
Pregel operator to express computation such as single source shortest path
val sourceId: VertexId = 3 // The ultimate source
// Initialize the graph such that all vertices
// except the root have distance infinity.
val initialGraph =
followerGraph.mapVertices((id, _) =>
if (id == sourceId) 0.0
else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) =>
math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
/*
Shortest distance from VertexId 3:
(4,1.0)
(6,1.0)
(2,1.0)
(1,2.0)
(3,0.0)
(7,1.0)
(5,1.0)
*/
Last updated