# GraphOps Class

class GraphOps[VD, ED] extends Serializable
Instance constructor:
new GraphOps(graph: Graph[VD, ED])(implicit arg0: ClassTag[VD], arg1: ClassTag[ED])
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]
Returns an RDD that contains for each vertex v its local edges, i.e., the edges that are incident on v, in the user-specified direction.
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
Collect the neighbor vertex ids for each vertex.
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
Collect the neighbor vertex attributes for each vertex.
def connectedComponents(maxIterations: Int): Graph[VertexId, ED]
Compute the connected component membership of each vertex and return a graph with the vertex value containing the lowest vertex id in the connected component containing that vertex.
def connectedComponents(): Graph[VertexId, ED]
Compute the connected component membership of each vertex and return a graph with the vertex value containing the lowest vertex id in the connected component containing that vertex.
def convertToCanonicalEdges(mergeFunc: (ED, ED) ⇒ ED = (e1, e2) => e1): Graph[VD, ED]
Convert bi-directional edges into uni-directional ones.
lazy val degrees: VertexRDD[Int]
The degree of each vertex in the graph.
def filter[VD2, ED2](preprocess: (Graph[VD, ED]) ⇒ Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) ⇒ Boolean = (x: EdgeTriplet[VD2, ED2]) => true, vpred: (VertexId, VD2) ⇒ Boolean = (v: VertexId, d: VD2) => true)(implicit arg0: ClassTag[VD2], arg1: ClassTag[ED2]): Graph[VD, ED]
Filter the graph by computing some values to filter on, and applying the predicates.
lazy val inDegrees: VertexRDD[Int]
The in-degree of each vertex in the graph.
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) ⇒ VD)(implicit arg0: ClassTag[U]): Graph[VD, ED]
Join the vertices with an RDD and then apply a function from the vertex and RDD entry to a new vertex value.
lazy val numEdges: Long
The number of edges in the graph.
lazy val
numVertices: Long
The number of vertices in the graph.
lazy val outDegrees: VertexRDD[Int]
The out-degree of each vertex in the graph.
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
Run a dynamic version of PageRank returning a graph with vertex attributes containing the PageRank and edge attributes containing the normalized edge weight.
def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
Run personalized PageRank for a given vertex, such that all random walks are started relative to the source node.
def pickRandomVertex(): VertexId
Picks a random vertex from the graph and returns its ID.
def pregel[A](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) ⇒ VD, sendMsg: (EdgeTriplet[VD, ED]) ⇒ Iterator[(VertexId, A)], mergeMsg: (A, A) ⇒ A)(implicit arg0: ClassTag[A]): Graph[VD, ED]
Execute a Pregel-like iterative vertex-parallel abstraction.
def removeSelfEdges(): Graph[VD, ED]
Remove self edges.
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
Run PageRank for a fixed number of iterations returning a graph with vertex attributes containing the PageRank and edge attributes the normalized edge weight.
def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int, resetProb: Double = 0.15): Graph[Vector, Double]
Run parallel personalized PageRank for a given array of source vertices, such that all random walks are started relative to the source vertices
def staticPersonalizedPageRank(src: VertexId, numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
Run Personalized PageRank for a fixed number of iterations with with all iterations originating at the source node returning a graph with vertex attributes containing the PageRank and edge attributes the normalized edge weight.
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
Compute the strongly connected component (SCC) of each vertex and return a graph with the vertex value containing the lowest vertex id in the SCC containing that vertex.
def triangleCount(): Graph[Int, ED]
Compute the number of triangles passing through each vertex.
Example:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val graph_operation=new GraphOps(graph)
val direction: EdgeDirection = EdgeDirection.Out
graph_operation.collectNeighborIds(direction).take(5).foreach(println)
/*
Output:
(4,[J@646df199)
(0,[J@4641c01)
(2,[J@1f267828)
(3,[J@4a55a32)
(7,[J@fcf9206)
graph_operation: org.apache.spark.graphx.GraphOps[(String, String),String] = org.apache.spark.graphx.GraphOps@2bef03ea
direction: org.apache.spark.graphx.EdgeDirection = EdgeDirection.Out
*/
Reference: