abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
Extends RDD[(VertexId, VD)] by ensuring that there is only one entry for each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be joined efficiently. All operations except reindex preserve the index. To construct a VertexRDD, use the VertexRDD object.
Additionally, stores routing information to enable joining the vertex attributes with an EdgeRDD.
VD the vertex attribute associated with each vertex in the set.
Example:
Construct a VertexRDD from a plain RDD:
// Construct an initial vertex set
val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
val vset = VertexRDD(someData)
// If there were redundant values in someData we would use a reduceFunc
val vset2 = VertexRDD(someData, reduceFunc)
// Finally we can use the VertexRDD to index another dataset
val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
// Now we can construct very fast joins between the two sets
val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
Example:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val edges = EdgeRDD.fromEdges(relationships.map(x=>Edge(x.srcId,x.dstId,"EdgeRDD")))
edges.take(5).foreach(println)
/*
Output:
Edge(2,5,EdgeRDD)
Edge(3,7,EdgeRDD)
Edge(5,3,EdgeRDD)
Edge(4,0,EdgeRDD)
Edge(5,0,EdgeRDD)
*/
val vertices = VertexRDD.fromEdges(edges,1,"VD")
vertices.take(5).foreach(println)
/*
Output:
(4,VD)
(0,VD)
(3,VD)
(7,VD)
(5,VD)
*/