VertexRDD Class
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
Extends RDD[(VertexId, VD)] by ensuring that there is only one entry for each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be joined efficiently. All operations except reindex preserve the index. To construct a VertexRDD, use the VertexRDD object.
Additionally, stores routing information to enable joining the vertex attributes with an EdgeRDD.
VD the vertex attribute associated with each vertex in the set.
Example:
Construct a VertexRDD from a plain RDD:
1
// Construct an initial vertex set
2
val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
3
val vset = VertexRDD(someData)
4
// If there were redundant values in someData we would use a reduceFunc
5
val vset2 = VertexRDD(someData, reduceFunc)
6
// Finally we can use the VertexRDD to index another dataset
7
val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
8
val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
9
// Now we can construct very fast joins between the two sets
10
val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
Copied!
Example:
1
import org.apache.spark._
2
import org.apache.spark.graphx._
3
import org.apache.spark.rdd.RDD
4
​
5
// Create an RDD for the vertices
6
val users: RDD[(VertexId, (String, String))] =
7
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
8
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
9
(4L, ("peter", "student"))))
10
// Create an RDD for edges
11
val relationships: RDD[Edge[String]] =
12
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
13
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
14
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
15
// Define a default user in case there are relationship with missing user
16
val defaultUser = ("John Doe", "Missing")
17
​
18
// Build the initial Graph
19
val graph = Graph(users, relationships, defaultUser)
20
​
21
val edges = EdgeRDD.fromEdges(relationships.map(x=>Edge(x.srcId,x.dstId,"EdgeRDD")))
22
​
23
edges.take(5).foreach(println)
24
​
25
/*
26
Output:
27
Edge(2,5,EdgeRDD)
28
Edge(3,7,EdgeRDD)
29
Edge(5,3,EdgeRDD)
30
Edge(4,0,EdgeRDD)
31
Edge(5,0,EdgeRDD)
32
*/
33
​
34
val vertices = VertexRDD.fromEdges(edges,1,"VD")
35
​
36
vertices.take(5).foreach(println)
37
​
38
/*
39
Output:
40
(4,VD)
41
(0,VD)
42
(3,VD)
43
(7,VD)
44
(5,VD)
45
*/
Copied!
Reference:
Last modified 1yr ago
Copy link