Spark Graph Computing Continue
At present, there are many graph-based parallel computing frameworks, such as
Pregel from Google
Giraph / HAMA from Apache open source graph computing framework and the GraphLab.
Among them, Pregel, HAMA and Giraph are very similar and are based on BSP ( Bulk Synchronous Parallell) mode.
A BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors), a network that routes messages between pairs of such components, and a hardware facility that allows for the synchronization of all or a subset of components.
This is commonly interpreted as a set of processors which may follow different threads of computation, with each processor equipped with fast local memory and interconnected by a communication network. A BSP algorithm relies heavily on the third feature; a computation proceeds in a series of global super steps, which consists of three components:
Concurrent computation: every participating processor may perform local computations, i.e., each process can only make use of values stored in the local fast memory of the processor. The computations occur asynchronously of all the others but may overlap with communication.
Communication: The processes exchange data between themselves to facilitate remote data storage capabilities.
Barrier synchronization: When a process reaches this point (the barrier), it waits until all other processes have reached the same barrier.
Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:
For example, we can calculate the in-degree of each vertex (defined in GraphOps) in the following way.
1
val graph: Graph[(String, String), String]
2
// Use the implicit GraphOps.inDegrees operator
3
val inDegrees: VertexRDD[Int] = graph.inDegrees
Copied!
Following is the Graph operations defined in abstract class Graph, that has all the class attributes and class methods.
1
import scala.reflect.ClassTag
2
​
3
import org.apache.spark._
4
import org.apache.spark.graphx._
5
import org.apache.spark.graphx.impl.RoutingTablePartition
6
import org.apache.spark.graphx.impl.ShippableVertexPartition
7
import org.apache.spark.graphx.impl.VertexAttributeBlock
8
import org.apache.spark.graphx.impl.VertexRDDImpl
9
import org.apache.spark.rdd._
10
import org.apache.spark.storage.StorageLevel
11
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}
12
​
13
abstract class Graph[VD: ClassTag, ED: ClassTag] {
14
// Information about the Graph ===================================================================
15
val numEdges: Long
16
val numVertices: Long
17
val inDegrees: VertexRDD[Int]
18
val outDegrees: VertexRDD[Int]
19
val degrees: VertexRDD[Int]
20
// Views of the graph as collections =============================================================
21
val vertices: VertexRDD[VD]
22
val edges: EdgeRDD[ED]
23
val triplets: RDD[EdgeTriplet[VD, ED]]
24
// Functions for caching graphs ==================================================================
25
def persist(newLevel: StorageLevel =
26
StorageLevel.MEMORY_ONLY): Graph[VD, ED]
27
def cache(): Graph[VD, ED]
28
def unpersistVertices(blocking: Boolean = true):
29
Graph[VD, ED]
30
// Change the partitioning heuristic ============================================================
31
def partitionBy(partitionStrategy:
32
PartitionStrategy): Graph[VD, ED]
33
// Transform vertex and edge attributes ==========================================================
34
def mapVertices[VD2](map: (VertexId, VD) => VD2):
35
Graph[VD2, ED]
36
def mapEdges[ED2](map: Edge[ED] => ED2):
37
Graph[VD, ED2]
38
def mapEdges[ED2](map: (PartitionID,
39
Iterator[Edge[ED]]) => Iterator[ED2]):
40
Graph[VD, ED2]
41
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] =>
42
ED2): Graph[VD, ED2]
43
def mapTriplets[ED2](map:
44
(PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
45
Iterator[ED2])
46
: Graph[VD, ED2]
47
// Modify the graph structure ====================================================================
48
def reverse: Graph[VD, ED]
49
def subgraph(
50
epred: EdgeTriplet[VD,ED] =>
51
Boolean = (x => true),
52
vpred: (VertexId, VD) =>
53
Boolean = ((v, d) => true))
54
: Graph[VD, ED]
55
def mask[VD2, ED2](other: Graph1[VD2, ED2]):
56
Graph[VD, ED]
57
def groupEdges(merge: (ED, ED) => ED):
58
Graph[VD, ED]
59
// Join RDDs with the graph ======================================================================
60
def joinVertices[U]
61
(table: RDD[(VertexId, U)])
62
(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
63
def outerJoinVertices[U, VD2]
64
(other: RDD[(VertexId, U)])
65
(mapFunc: (VertexId, VD, Option[U]) => VD2)
66
: Graph[VD2, ED]
67
// Aggregate information about
68
// adjacent triplets =================================================
69
def collectNeighborIds
70
(edgeDirection: EdgeDirection):
71
VertexRDD[Array[VertexId]]
72
def collectNeighbors(edgeDirection:
73
EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
74
def aggregateMessages[Msg: ClassTag, A: ClassTag]
75
(
76
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
77
mergeMsg: (Msg, Msg) => Msg,
78
tripletFields: TripletFields = TripletFields.All)
79
: VertexRDD[A]
80
// Iterative graph-parallel computation ==========================================================
81
def pregel[A: ClassTag]
82
(initialMsg: A,
83
maxIterations: Int,
84
activeDirection: EdgeDirection)(
85
vprog: (VertexId, VD, A) => VD,
86
sendMsg: EdgeTriplet[VD, ED] =>
87
Iterator[(VertexId,A)],
88
mergeMsg: (A, A) => A)
89
: Graph[VD, ED]
90
// Basic graph algorithms ========================================================================
91
def pageRank
92
(tol: Double, resetProb: Double = 0.15):
93
Graph[Double, Double]
94
def connectedComponents(): Graph[VertexId, ED]
95
def triangleCount(): Graph[Int, ED]
96
def stronglyConnectedComponents
97
(numIter: Int): Graph[VertexId, ED]
98
}
Copied!
The above code is to define Graph abstract class, where its methods are not yet defined. The definitions of methods will be illustrated later on here.
There are following operations you can perform on a graph:
Operations on property (property operator)
1
abstract class Graph[VD: ClassTag, ED: ClassTag] {
2
// Transform vertex and edge attributes ==========================================================
3
def mapVertices[VD2](map: (VertexId, VD) => VD2):
4
Graph[VD2, ED]
5
def mapEdges[ED2](map: Edge[ED] => ED2):
6
Graph[VD, ED2]
7
def mapEdges[ED2]
8
(map: (PartitionID, Iterator[Edge[ED]]) =>
9
Iterator[ED2]): Graph[VD, ED2]
10
def mapTriplets[ED2]
11
(map: EdgeTriplet[VD, ED] => ED2):
12
Graph[VD, ED2]
13
def mapTriplets[ED2]
14
(map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
15
: Graph[VD, ED2]
16
}
Copied!
method mapVertices[VD2]
1
/**
2
* Transforms each vertex attribute in the graph
3
using the map function.
4
*
5
* @note The new graph has the same structure.
6
As a consequence the underlying index structures
7
* can be reused.
8
*
9
* @param map the function from a vertex object
10
to a new vertex value
11
*
12
* @tparam VD2 the new vertex data type
13
*
14
* @example We might use this operation to
15
change the vertex values
16
* from one type to another to initialize an
17
algorithm.
18
* {{{
19
* val rawGraph: Graph[(), ()] =
20
Graph.textFile("hdfs://file")
21
* val root = 42
22
* var bfsGraph = rawGraph.mapVertices[Int]
23
((vid, data) => if (vid == root) 0
24
else Math.MaxValue)
25
* }}}
26
*
27
*/
28
def mapVertices[VD2: ClassTag]
29
(map: (VertexId, VD) => VD2): Graph[VD2, ED]
30
31
//Example:
32
val graph: Graph[Double, Int] =
33
GraphGenerators.logNormalGraph(sc, numVertices = 12)
34
.mapVertices( (id, _) => id.toDouble )
35
​
36
​
Copied!
method mapEdges[ED2]
1
/**
2
* Transforms each edge attribute in the graph
3
using the map function. The map function is
4
not
5
* passed the vertex value for the vertices
6
adjacent to the edge. If vertex values are
7
desired,
8
* use `mapTriplets`.
9
*
10
* @note This graph is not changed and that the
11
new graph has the
12
* same structure. As a consequence the
13
underlying index structures
14
* can be reused.
15
*
16
* @param map the function from an edge object
17
to a new edge value.
18
*
19
* @tparam ED2 the new edge data type
20
*
21
* @example This function might be used to
22
initialize edge
23
* attributes.
24
*
25
*/
26
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2):
27
Graph[VD, ED2] = {
28
mapEdges((pid, iter) => iter.map(map))
29
}
30
31
//Example
32
33
val edgeTo2 = graph.mapEdges(e => 2)
34
edgeTo2.edges.take(5).foreach(println)
35
​
36
/*
37
Edge(0,1,2)
38
Edge(0,1,2)
39
Edge(0,3,2)
40
Edge(0,3,2)
41
Edge(0,5,2)
42
*/
43
44
/**
45
* Transforms each edge attribute using the map
46
function, passing it a whole partition at a
47
* time. The map function is given an iterator
48
over edges within a logical partition as well as
49
* the partition's ID, and it should return a
50
new iterator over the new values of each edge.
51
The
52
* new iterator's elements must correspond
53
one-to-one with the old iterator's elements. If
54
* adjacent vertex values are desired,
55
use `mapTriplets`.
56
*
57
* @note This does not change the structure
58
of the
59
* graph or modify the values of this graph.
60
As a consequence
61
* the underlying index structures can be reused.
62
*
63
* @param map a function that takes a
64
partition id and an iterator
65
* over all the edges in the partition, and
66
must return an iterator over
67
* the new values for each edge in the
68
order of the input iterator
69
*
70
* @tparam ED2 the new edge data type
71
*
72
*/
73
def mapEdges[ED2: ClassTag]
74
(map: (PartitionID, Iterator[Edge[ED]]) =>
75
Iterator[ED2])
76
: Graph[VD, ED2]
77
​
78
​
Copied!
method mapTriplets[ED2]
1
/**
2
* Transforms each edge attribute using the
3
map function, passing it the adjacent vertex
4
* attributes as well. If adjacent vertex
5
values are not required,
6
* consider using `mapEdges` instead.
7
*
8
* @note This does not change the structure
9
of the
10
* graph or modify the values of this graph.
11
As a consequence
12
* the underlying index structures can be reused.
13
*
14
* @param map the function from an edge object
15
to a new edge value.
16
*
17
* @tparam ED2 the new edge data type
18
*
19
* @example This function might be used to
20
initialize edge
21
* attributes based on the attributes associated
22
with each vertex.
23
* {{{
24
* val rawGraph: Graph[Int, Int] =
25
someLoadFunction()
26
* val graph = rawGraph.mapTriplets[Int]
27
( edge =>
28
* edge.src.data - edge.dst.data)
29
* }}}
30
*
31
*/
32
def mapTriplets[ED2: ClassTag]
33
(map: EdgeTriplet[VD, ED] => ED2):
34
Graph[VD, ED2] = {
35
mapTriplets((pid, iter) => iter.map(map),
36
TripletFields.All)
37
}
38
​
39
//Example:
40
graph.mapTriplets(e=>e.attr*10)
41
​
42
/**
43
* Transforms each edge attribute using the map
44
function, passing it the adjacent vertex
45
* attributes as well. If adjacent vertex values
46
are not required,
47
* consider using `mapEdges` instead.
48
*
49
* @note This does not change the structure of
50
the
51
* graph or modify the values of this graph.
52
As a consequence
53
* the underlying index structures can be reused.
54
*
55
* @param map the function from an edge object
56
to a new edge value.
57
* @param tripletFields which fields should be
58
included in the edge triplet passed to the map
59
* function. If not all fields are needed,
60
specifying this can improve performance.
61
*
62
* @tparam ED2 the new edge data type
63
*
64
* @example This function might be used to
65
initialize edge
66
* attributes based on the attributes associated
67
with each vertex.
68
* {{{
69
* val rawGraph: Graph[Int, Int] =
70
someLoadFunction()
71
* val graph = rawGraph.mapTriplets[Int]
72
( edge =>
73
* edge.src.data - edge.dst.data)
74
* }}}
75
*
76
*/
77
def mapTriplets[ED2: ClassTag](
78
map: EdgeTriplet[VD, ED] => ED2,
79
tripletFields: TripletFields):
80
Graph[VD, ED2] = {
81
mapTriplets((pid, iter) =>
82
iter.map(map), tripletFields)
83
}
84
​
85
/**
86
* Transforms each edge attribute a partition
87
at a time using the map function, passing it the
88
* adjacent vertex attributes as well. The map
89
function is given an iterator over edge triplets
90
* within a logical partition and should yield
91
a new iterator over the new values of each edge in
92
* the order in which they are provided.
93
If adjacent vertex values are not required,
94
consider
95
* using `mapEdges` instead.
96
*
97
* @note This does not change the structure of
98
the
99
* graph or modify the values of this graph.
100
As a consequence
101
* the underlying index structures can be reused.
102
*
103
* @param map the iterator transform
104
* @param tripletFields which fields should be
105
included in the edge triplet passed to the map
106
* function. If not all fields are needed,
107
specifying this can improve performance.
108
*
109
* @tparam ED2 the new edge data type
110
*
111
*/
112
def mapTriplets[ED2: ClassTag](
113
map:
114
(PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
115
Iterator[ED2],
116
tripletFields: TripletFields):
117
Graph[VD, ED2]
118
​
Copied!
Operations on structure (structural operator)
1
// Modify the graph structure ====================================================================
2
def reverse: Graph[VD, ED]
3
​
4
def subgraph(
5
epred:
6
EdgeTriplet[VD,ED] => Boolean = (x => true),
7
vpred:
8
(VertexId, VD) => Boolean = ((v, d) => true))
9
: Graph[VD, ED]
10
def mask[VD2, ED2](other: Graph1[VD2, ED2]):
11
Graph[VD, ED]
12
def groupEdges(merge: (ED, ED) => ED):
13
Graph[VD, ED]
14
​
Copied!
method reverse
1
/**
2
* Reverses all edges in the graph. If this
3
graph contains an edge from a to b then the
4
returned
5
* graph contains an edge from b to a.
6
*/
7
def reverse: Graph[VD, ED]
8
9
// example:
10
graph.edges.collect.foreach(println)
11
/*
12
Edge(0,1,1)
13
Edge(0,1,1)
14
Edge(0,4,1)
15
Edge(1,1,1)
16
Edge(1,3,1)
17
Edge(2,2,1)
18
Edge(2,3,1)
19
Edge(2,3,1)
20
Edge(3,0,1)
21
Edge(3,0,1)
22
Edge(3,2,1)
23
Edge(4,0,1)
24
Edge(4,0,1)
25
Edge(4,1,1)
26
Edge(4,2,1)
27
*/
28
​
29
graph.reverse.edges.collect.foreach(println)
30
​
31
/*
32
Edge(1,0,1)
33
Edge(1,0,1)
34
Edge(1,1,1)
35
Edge(3,1,1)
36
Edge(4,0,1)
37
Edge(0,3,1)
38
Edge(0,3,1)
39
Edge(0,4,1)
40
Edge(0,4,1)
41
Edge(1,4,1)
42
Edge(2,2,1)
43
Edge(2,3,1)
44
Edge(2,4,1)
45
Edge(3,2,1)
46
Edge(3,2,1)
47
​
48
*/
49
50
51
52
53
54
55
Copied!
method subgraph
1
/**
2
* Restricts the graph to only the vertices and
3
edges satisfying the predicates. The resulting
4
* subgraph satisfies
5
*
6
* {{{
7
* V' = {v : for all v in V where vpred(v)}
8
* E' = {(u,v): for all (u,v) in E
9
where epred((u,v)) && vpred(u) && vpred(v)}
10
* }}}
11
*
12
* @param epred the edge predicate, which takes
13
a triplet and
14
* evaluates to true if the edge is to remain
15
in the subgraph. Note
16
* that only edges where both vertices satisfy
17
the vertex
18
* predicate are considered.
19
*
20
* @param vpred the vertex predicate, which
21
takes a vertex object and
22
* evaluates to true if the vertex is to be
23
included in the subgraph
24
*
25
* @return the subgraph containing only the
26
vertices and edges that
27
* satisfy the predicates
28
*/
29
def subgraph(
30
epred:
31
EdgeTriplet[VD, ED] => Boolean = (x => true),
32
vpred:
33
(VertexId, VD) => Boolean = ((v, d) => true))
34
: Graph[VD, ED]
35
​
36
//Example:
37
graph.subgraph(e=>e.srcId==3,(vid,_)=>vid==0)
38
.vertices.foreach(println)
39
40
/*
41
(0,3)
42
*/
43
​
Copied!
method mask[VD2,ED2]
1
/**
2
* Restricts the graph to only the vertices and
3
edges that are also in `other`, but keeps the
4
* attributes from this graph.
5
* @param other the graph to project this graph
6
onto
7
* @return a graph with vertices and edges that
8
exist in both the current graph and `other`,
9
* with vertex and edge data from the current
10
graph
11
*/
12
def mask[VD2: ClassTag, ED2: ClassTag]
13
(other: Graph[VD2, ED2]): Graph[VD, ED]
14
​
15
//Example
16
​
17
graph.mask(graph).vertices.collect.foreach(println)
18
​
19
/*
20
(4,4)
21
(0,3)
22
(2,3)
23
(1,2)
24
(3,3)
25
*/
26
​
27
​
Copied!
method groupEdges
1
/**
2
* Merges multiple edges between two vertices
3
into a single edge. For correct results, the
4
graph
5
* must have been partitioned using
6
`partitionBy`.
7
*
8
* @param merge the user-supplied commutative
9
associative function to merge edge attributes
10
* for duplicate edges.
11
*
12
* @return The resulting graph with a single
13
edge for each (source, dest) vertex pair.
14
*/
15
def groupEdges(merge: (ED, ED) => ED):
16
Graph[VD, ED]
17
18
//Example, like reduce op
19
graph.groupEdges(_+_).edges.collect.foreach(println)
20
21
Copied!
Join operation (Join operator)
1
// Join RDDs with the graph ======================================================================
2
def joinVertices[U](table: RDD[(VertexId, U)])
3
(mapFunc: (VertexId, VD, U) => VD):
4
Graph[VD, ED]
5
def outerJoinVertices[U, VD2]
6
(other: RDD[(VertexId, U)])
7
(mapFunc: (VertexId, VD, Option[U]) => VD2)
8
: Graph[VD2, ED]u
Copied!
method joinVertices
1
/**
2
* Join the vertices with an RDD and then apply
3
a function from the
4
* vertex and RDD entry to a new vertex value.
5
The input table
6
* should contain at most one entry for each
7
vertex. If no entry is
8
* provided the map function is skipped and the
9
old value is used.
10
*
11
* @tparam U the type of entry in the table
12
of updates
13
* @param table the table to join with the
14
vertices in the graph.
15
* The table should contain at most one entry
16
for each vertex.
17
* @param mapFunc the function used to compute
18
the new vertex
19
* values. The map function is invoked only
20
for vertices with a
21
* corresponding entry in the table otherwise
22
the old vertex value
23
* is used.
24
*
25
* @example This function is used to update the
26
vertices with new
27
* values based on external data. For example
28
we could add the out
29
* degree to each vertex record
30
*
31
* {{{
32
* val rawGraph: Graph[Int, Int] =
33
GraphLoader.edgeListFile(sc, "webgraph")
34
* .mapVertices((_, _) => 0)
35
* val outDeg = rawGraph.outDegrees
36
* val graph = rawGraph.joinVertices[Int](outDeg)
37
* ((_, _, outDeg) => outDeg)
38
* }}}
39
*
40
*/
41
def joinVertices[U: ClassTag]
42
(table: RDD[(VertexId, U)])
43
(mapFunc: (VertexId, VD, U) => VD)
44
: Graph[VD, ED] = {
45
val uf = (id: VertexId, data: VD, o: Option[U])
46
=> {
47
o match {
48
case Some(u) => mapFunc(id, data, u)
49
case None => data
50
}
51
}
52
graph.outerJoinVertices(table)(uf)
53
}
54
​
55
//example
56
​
57
import org.apache.spark._
58
import org.apache.spark.graphx._
59
import org.apache.spark.rdd.RDD
60
import org.apache.log4j._
61
import org.apache.spark.sql._
62
import org.apache.spark.graphx.{Graph, VertexRDD}
63
import org.apache.spark.graphx.util.GraphGenerators
64
Logger.getLogger("org").setLevel(Level.ERROR)
65
val spark = SparkSession
66
.builder
67
.appName("graphx")
68
.master("local[*]")
69
.config("spark.sql.warehouse.dir", "file:///tmp")
70
.getOrCreate()
71
val sc=spark.sparkContext
72
val users: RDD[(VertexId, (String, String))] =
73
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
74
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
75
val relationships: RDD[Edge[String]] =
76
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
77
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
78
val defaultUser = ("", "Missing")
79
val graph = Graph(users, relationships, defaultUser)
80
​
81
val userWithAge: RDD[(VertexId, Int)] =
82
sc.parallelize(Array(
83
(1L, 25), (2L, 30), (3L, 23), (4L, 30)
84
))
85
86
graph.joinVertices(userWithAge) { (id, prop, age) => {
87
(prop._1 + "", prop._2 + " Age is " + age.toString)
88
}}.vertices.collect.foreach(println)
89
​
90
/*
91
(4,(sherry,owner wife Age is 30))
92
(2,(george,clerk Age is 30))
93
(1,(jack,owner Age is 25))
94
(3,(mary,sales Age is 23))
95
​
96
*/
97
​
98
​
99
​
100
​
Copied!
method outerJoinVertices[U: ClassTag, VD2: ClassTag]
1
/**
2
* Joins the vertices with entries in the `table`
3
RDD and merges the results using `mapFunc`.
4
* The input table should contain at most one
5
entry for each vertex. If no entry in `other`
6
is
7
* provided for a particular vertex in the
8
graph, the map function receives `None`.
9
*
10
* @tparam U the type of entry in the table of
11
updates
12
* @tparam VD2 the new vertex value type
13
*
14
* @param other the table to join with the
15
vertices in the graph.
16
* The table should contain at
17
most one entry for each vertex.
18
* @param mapFunc the function used to compute
19
the new vertex values.
20
* The map function is invoked
21
for all vertices, even those
22
* that do not have a
23
corresponding entry in the table.
24
*
25
* @example This function is used to update
26
the vertices with new values based on external
27
data.
28
* For example we could add the
29
out-degree to each vertex record:
30
*
31
* {{{
32
* val rawGraph: Graph[_, _] =
33
Graph.textFile("webgraph")
34
* val outDeg: RDD[(VertexId, Int)] =
35
rawGraph.outDegrees
36
* val graph =
37
rawGraph.outerJoinVertices(outDeg) {
38
* (vid, data, optDeg) => optDeg.getOrElse(0)
39
* }
40
* }}}
41
*/
42
def outerJoinVertices[U: ClassTag, VD2: ClassTag]
43
(other: RDD[(VertexId, U)])
44
(mapFunc: (VertexId, VD, Option[U]) => VD2)
45
(implicit eq: VD =:= VD2 = null)
46
: Graph[VD2, ED]
47
​
48
//Example:
49
graph.outerJoinVertices(userWithAge) { (id, prop, age) =>
50
age match {
51
case Some(age) => (prop._1 + "", prop._2 + " Age is " + age.toString)
52
case None => "0"
53
}
54
}.vertices.collect.foreach(println)
55
56
/*
57
(4,(sherry,owner wife Age is 30))
58
(2,(george,clerk Age is 30))
59
(1,(jack,owner Age is 25))
60
(3,(mary,sales Age is 23))
61
*/
62
​
63
​
Copied!
Neighborhood Aggregation
1
// Aggregate information about adjacent triplets =================================================
2
def collectNeighborIds
3
(edgeDirection: EdgeDirection):
4
VertexRDD[Array[VertexId]]
5
def collectNeighbors
6
(edgeDirection: EdgeDirection):
7
VertexRDD[Array[(VertexId, VD)]]
8
def aggregateMessages[Msg: ClassTag, A: ClassTag]
9
(
10
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
11
mergeMsg: (Msg, Msg) => Msg,
12
tripletFields: TripletFields =
13
TripletFields.All)
14
: VertexRDD[A]
Copied!
method collectNeighborIds(edgeDirection: EdgeDirection)
1
/**
2
* Collect the neighbor vertex ids for each
3
vertex.
4
*
5
* @param edgeDirection the direction along
6
which to collect
7
* neighboring vertices
8
*
9
* @return the set of neighboring ids for
10
each vertex
11
*/
12
def collectNeighborIds
13
(edgeDirection: EdgeDirection):
14
VertexRDD[Array[VertexId]] = {
15
val nbrs =
16
if (edgeDirection == EdgeDirection.Either) {
17
graph.aggregateMessages[Array[VertexId]](
18
ctx => { ctx.sendToSrc(Array(ctx.dstId));
19
ctx.sendToDst(Array(ctx.srcId)) },
20
_ ++ _, TripletFields.None)
21
} else if (edgeDirection == EdgeDirection.Out) {
22
graph.aggregateMessages[Array[VertexId]](
23
ctx => ctx.sendToSrc(Array(ctx.dstId)),
24
_ ++ _, TripletFields.None)
25
} else if (edgeDirection == EdgeDirection.In) {
26
graph.aggregateMessages[Array[VertexId]](
27
ctx => ctx.sendToDst(Array(ctx.srcId)),
28
_ ++ _, TripletFields.None)
29
} else {
30
throw new SparkException
31
("It doesn't make sense to collect neighbor ids without a " +
32
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
33
}
34
graph.vertices.leftZipJoin(nbrs)
35
{ (vid, vdata, nbrsOpt) =>
36
nbrsOpt.getOrElse(Array.empty[VertexId])
37
}
38
} // end of collectNeighborIds
39
​
40
​
41
//Example
42
graph.collectNeighborIds(EdgeDirection.Either)
43
.take(5).map(e=>e._1)
44
​
45
res10: Array[org.apache.spark.graphx.VertexId] = Array(4, 2, 1, 3)
Copied!
method def collectNeighbors(edgeDirection: EdgeDirection)
1
/**
2
* Collect the neighbor vertex attributes
3
for each vertex.
4
*
5
* @note This function could be highly
6
inefficient on power-law
7
* graphs where high degree vertices may
8
force a large amount of
9
* information to be collected to a single
10
location.
11
*
12
* @param edgeDirection the direction
13
along which to collect
14
* neighboring vertices
15
*
16
* @return the vertex set of neighboring
17
vertex attributes for each vertex
18
*/
19
def collectNeighbors
20
(edgeDirection: EdgeDirection):
21
VertexRDD[Array[(VertexId, VD)]] = {
22
val nbrs = edgeDirection match {
23
case EdgeDirection.Either =>
24
graph.aggregateMessages
25
[Array[(VertexId, VD)]](
26
ctx => {
27
ctx.sendToSrc
28
(Array((ctx.dstId, ctx.dstAttr)))
29
ctx.sendToDst
30
(Array((ctx.srcId, ctx.srcAttr)))
31
},
32
(a, b) => a ++ b, TripletFields.All)
33
case EdgeDirection.In =>
34
graph.aggregateMessages
35
[Array[(VertexId, VD)]](
36
ctx => ctx.sendToDst
37
(Array((ctx.srcId, ctx.srcAttr))),
38
(a, b) => a ++ b, TripletFields.Src)
39
case EdgeDirection.Out =>
40
graph.aggregateMessages
41
[Array[(VertexId, VD)]](
42
ctx => ctx.sendToSrc
43
(Array((ctx.dstId, ctx.dstAttr))),
44
(a, b) => a ++ b, TripletFields.Dst)
45
case EdgeDirection.Both =>
46
throw new SparkException
47
("collectEdges does not support EdgeDirection.Both. Use" +
48
"EdgeDirection.Either instead.")
49
}
50
graph.vertices.leftJoin(nbrs)
51
{ (vid, vdata, nbrsOpt) =>
52
nbrsOpt.getOrElse
53
(Array.empty[(VertexId, VD)])
54
}
55
} // end of collectNeighbor
56
​
57
//Example
58
​
59
graph.collectNeighbors(EdgeDirection.Either)
60
.take(5)
61
62
63
/*
64
res11: Array[(org.apache.spark.graphx.VertexId,
65
Array[(org.apache.spark.graphx.VertexId
66
, (String, String))])] = Array
67
((4,Array((1,(jack,owner)))), (2,Array((1,(jack,owner)), (3,(mary,sales)))), (1,Array((2,(george,clerk)), (3,(mary,sales)), (4,(sherry,owner wife)))), (3,Array((1,(jack,owner)), (2,(george,clerk)))))
68
​
69
*/
Copied!
method def aggregateMessages[Msg: ClassTag, A: ClassTag]
1
/**
2
* Aggregates values from the neighboring edges
3
and vertices of each vertex. The user-supplied
4
* `sendMsg` function is invoked on each edge of
5
the graph, generating 0 or more messages to be
6
* sent to either vertex in the edge.
7
The `mergeMsg` function is then used to
8
combine all messages
9
* destined to the same vertex.
10
*
11
* @tparam A the type of message to be sent
12
to each vertex
13
*
14
* @param sendMsg runs on each edge, sending
15
messages to neighboring vertices using the
16
* [[EdgeContext]].
17
* @param mergeMsg used to combine messages
18
from `sendMsg` destined to the same vertex. This
19
* combiner should be commutative and
20
associative.
21
* @param tripletFields which fields should be
22
included in the [[EdgeContext]] passed to the
23
* `sendMsg` function. If not all fields are
24
needed, specifying this can improve performance.
25
*
26
* @example We can use this function to
27
compute the in-degree of each
28
* vertex
29
* {{{
30
* val rawGraph: Graph[_, _] =
31
Graph.textFile("twittergraph")
32
* val inDeg: RDD[(VertexId, Int)] =
33
* rawGraph.aggregateMessages[Int]
34
(ctx => ctx.sendToDst(1), _ + _)
35
* }}}
36
*
37
* @note By expressing computation at the
38
edge level we achieve
39
* maximum parallelism. This is one of the
40
core functions in the
41
* Graph API that enables neighborhood
42
level computation. For
43
* example this function can be used to count
44
neighbors satisfying a
45
* predicate or implement PageRank.
46
*
47
*/
48
def aggregateMessages[A: ClassTag](
49
sendMsg: EdgeContext[VD, ED, A] => Unit,
50
mergeMsg: (A, A) => A,
51
tripletFields:
52
TripletFields = TripletFields.All)
53
: VertexRDD[A] = {
54
aggregateMessagesWithActiveSet
55
(sendMsg, mergeMsg, tripletFields, None)
56
}
57
​
58
//Example
59
​
60
import org.apache.spark._
61
import org.apache.spark.graphx._
62
import org.apache.spark.rdd.RDD
63
import org.apache.log4j._
64
import org.apache.spark.sql._
65
import org.apache.spark.graphx.{Graph, VertexRDD}
66
import org.apache.spark.graphx.util.GraphGenerators
67
Logger.getLogger("org").setLevel(Level.ERROR)
68
val spark = SparkSession
69
.builder
70
.appName("graphx")
71
.master("local[*]")
72
.config("spark.sql.warehouse.dir", "file:///tmp")
73
.getOrCreate()
74
val sc=spark.sparkContext
75
val users: RDD[(VertexId, (String, String))] =
76
sc.parallelize(Array((1L, ("jack", "owner")), (2L, ("george", "clerk")),
77
(3L, ("mary", "sales")), (4L, ("sherry", "owner wife"))))
78
val relationships: RDD[Edge[String]] =
79
sc.parallelize(Array(Edge(1L, 2L, "boss"), Edge(1L, 3L, "boss"),
80
Edge(2L, 3L, "coworker"), Edge(4L, 1L, "boss")))
81
val defaultUser = ("", "Missing")
82
val graph = Graph(users, relationships, defaultUser)
83
​
84
// Equivalent to OutDegree count for each vertex
85
val degreeToDst:VertexRDD[Int]=
86
graph.aggregateMessages(x=>x.sendToDst(1), (a,b)=>a+b)
87
​
88
// Equivalent to InDegree count for each vertex
89
val degreeToSrc:VertexRDD[Int]=
90
graph.aggregateMessages(x=>x.sendToSrc(1), (a,b)=>a+b)
Copied!
Computing Degree Information (These are attributes of Graph class)
1
val inDegrees: VertexRDD[Int]
2
val outDegrees: VertexRDD[Int]
3
val degrees: VertexRDD[Int]
4
5
//example
6
​
7
graph.inDegrees.collect
8
graph.outDegrees.collect
9
graph.degrees.collect
Copied!
Implementation
1
// Define a reduce operation to compute the highest degree vertex
2
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
3
if (a._2 > b._2) a else b
4
}
5
// Compute the max degrees
6
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
7
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
8
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
9
​
Copied!
Caching and Uncaching
1
// Functions for caching graphs
2
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
3
4
def cache(): Graph[VD, ED]
5
6
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
7
​
Copied!
method persist
1
/**
2
* Mark this RDD for persisting using the specified level.
3
*
4
* @param newLevel the target storage level
5
* @param allowOverride whether to override any existing level with the new one
6
*/
7
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
8
// TODO: Handle changes of StorageLevel
9
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
10
throw new UnsupportedOperationException(
11
"Cannot change storage level of an RDD after it was already assigned a level")
12
}
13
// If this is the first time this RDD is marked for persisting, register it
14
// with the SparkContext for cleanups and accounting. Do this only once.
15
if (storageLevel == StorageLevel.NONE) {
16
sc.cleaner.foreach(_.registerRDDForCleanup(this))
17
sc.persistRDD(this)
18
}
19
storageLevel = newLevel
20
this
21
}
22
​
23
/**
24
* Set this RDD's storage level to persist its values across operations after the first time
25
* it is computed. This can only be used to assign a new storage level if the RDD does not
26
* have a storage level set yet. Local checkpointing is an exception.
27
*/
28
def persist(newLevel: StorageLevel): this.type = {
29
if (isLocallyCheckpointed) {
30
// This means the user previously called localCheckpoint(), which should have already
31
// marked this RDD for persisting. Here we should override the old storage level with
32
// one that is explicitly requested by the user (after adapting it to use disk).
33
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
34
} else {
35
persist(newLevel, allowOverride = false)
36
}
37
}
38
​
39
/**
40
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
41
*/
42
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
43
​
44
//Example
45
graph.persist()
46
graph.unpersist()
47
graph.unpersistVertices(true)
Copied!
method cache()
1
/**
2
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
3
*/
4
def cache(): this.type = persist()
5
​
6
//Example
7
​
8
graph.cache()
Copied!
Pregel API
1
// Iterative graph-parallel computation
2
def pregel[A: ClassTag](initialMsg: A,
3
maxIterations: Int, activeDirection: EdgeDirection)
4
(
5
vprog: (VertexId, VD, A) => VD,
6
sendMsg: EdgeTriplet[VD, ED] =>
7
Iterator[(VertexId,A)],
8
mergeMsg: (A, A) => A
9
)
10
: Graph[VD, ED]
Copied!
detailed implementation:
1
*
2
* @tparam A the Pregel message type
3
*
4
* @param initialMsg the message each vertex will receive at the on
5
* the first iteration
6
*
7
* @param maxIterations the maximum number of iterations to run for
8
*
9
* @param activeDirection the direction of edges incident to a vertex that received a message in
10
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
11
* out-edges of vertices that received a message in the previous round will run.
12
*
13
* @param vprog the user-defined vertex program which runs on each
14
* vertex and receives the inbound message and computes a new vertex
15
* value. On the first iteration the vertex program is invoked on
16
* all vertices and is passed the default message. On subsequent
17
* iterations the vertex program is only invoked on those vertices
18
* that receive messages.
19
*
20
* @param sendMsg a user supplied function that is applied to out
21
* edges of vertices that received messages in the current
22
* iteration
23
*
24
* @param mergeMsg a user supplied function that takes two incoming
25
* messages of type A and merges them into a single message of type
26
* A. ''This function must be commutative and associative and
27
* ideally the size of A should not increase.''
28
*
29
* @return the resulting graph at the end of the computation
30
*
31
*/
32
def pregel[A: ClassTag](
33
initialMsg: A,
34
maxIterations: Int = Int.MaxValue,
35
activeDirection: EdgeDirection = EdgeDirection.Either)(
36
vprog: (VertexId, VD, A) => VD,
37
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
38