VertexRDD Class
// Construct an initial vertex set
val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
val vset = VertexRDD(someData)
// If there were redundant values in someData we would use a reduceFunc
val vset2 = VertexRDD(someData, reduceFunc)
// Finally we can use the VertexRDD to index another dataset
val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
// Now we can construct very fast joins between the two sets
val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val edges = EdgeRDD.fromEdges(relationships.map(x=>Edge(x.srcId,x.dstId,"EdgeRDD")))
edges.take(5).foreach(println)
/*
Output:
Edge(2,5,EdgeRDD)
Edge(3,7,EdgeRDD)
Edge(5,3,EdgeRDD)
Edge(4,0,EdgeRDD)
Edge(5,0,EdgeRDD)
*/
val vertices = VertexRDD.fromEdges(edges,1,"VD")
vertices.take(5).foreach(println)
/*
Output:
(4,VD)
(0,VD)
(3,VD)
(7,VD)
(5,VD)
*/Last updated