VertexRDD Class

abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]

Extends RDD[(VertexId, VD)] by ensuring that there is only one entry for each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be joined efficiently. All operations except reindex preserve the index. To construct a VertexRDD, use the VertexRDD object.

Additionally, stores routing information to enable joining the vertex attributes with an EdgeRDD.

VD the vertex attribute associated with each vertex in the set.

Example:

Construct a VertexRDD from a plain RDD:

// Construct an initial vertex set
val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
val vset = VertexRDD(someData)
// If there were redundant values in someData we would use a reduceFunc
val vset2 = VertexRDD(someData, reduceFunc)
// Finally we can use the VertexRDD to index another dataset
val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
// Now we can construct very fast joins between the two sets
val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)

Example:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

val edges = EdgeRDD.fromEdges(relationships.map(x=>Edge(x.srcId,x.dstId,"EdgeRDD")))

edges.take(5).foreach(println)

/*
Output:
Edge(2,5,EdgeRDD)
Edge(3,7,EdgeRDD)
Edge(5,3,EdgeRDD)
Edge(4,0,EdgeRDD)
Edge(5,0,EdgeRDD)
*/

val vertices = VertexRDD.fromEdges(edges,1,"VD")

vertices.take(5).foreach(println)

/*
Output:
(4,VD)
(0,VD)
(3,VD)
(7,VD)
(5,VD)
*/

Reference:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Last updated