GraphOps Class

class GraphOps[VD, ED] extends Serializable

Instance constructor:

new GraphOps(graph: Graph[VD, ED])(implicit arg0: ClassTag[VD], arg1: ClassTag[ED])

def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]

Returns an RDD that contains for each vertex v its local edges, i.e., the edges that are incident on v, in the user-specified direction.

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

Collect the neighbor vertex ids for each vertex.

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

Collect the neighbor vertex attributes for each vertex.

def connectedComponents(maxIterations: Int): Graph[VertexId, ED]

Compute the connected component membership of each vertex and return a graph with the vertex value containing the lowest vertex id in the connected component containing that vertex.

def connectedComponents(): Graph[VertexId, ED]

Compute the connected component membership of each vertex and return a graph with the vertex value containing the lowest vertex id in the connected component containing that vertex.

def convertToCanonicalEdges(mergeFunc: (ED, ED) ⇒ ED = (e1, e2) => e1): Graph[VD, ED]

Convert bi-directional edges into uni-directional ones.

lazy val degrees: VertexRDD[Int]

The degree of each vertex in the graph.

def filter[VD2, ED2](preprocess: (Graph[VD, ED]) ⇒ Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) ⇒ Boolean = (x: EdgeTriplet[VD2, ED2]) => true, vpred: (VertexId, VD2) ⇒ Boolean = (v: VertexId, d: VD2) => true)(implicit arg0: ClassTag[VD2], arg1: ClassTag[ED2]): Graph[VD, ED]

Filter the graph by computing some values to filter on, and applying the predicates.

lazy val inDegrees: VertexRDD[Int]

The in-degree of each vertex in the graph.

def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) ⇒ VD)(implicit arg0: ClassTag[U]): Graph[VD, ED]

Join the vertices with an RDD and then apply a function from the vertex and RDD entry to a new vertex value.

lazy val numEdges: Long

The number of edges in the graph.

lazy val
numVertices: Long
The number of vertices in the graph.

lazy val outDegrees: VertexRDD[Int]

The out-degree of each vertex in the graph.

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

Run a dynamic version of PageRank returning a graph with vertex attributes containing the PageRank and edge attributes containing the normalized edge weight.

def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

Run personalized PageRank for a given vertex, such that all random walks are started relative to the source node.

def pickRandomVertex(): VertexId

Picks a random vertex from the graph and returns its ID.

def pregel[A](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) ⇒ VD, sendMsg: (EdgeTriplet[VD, ED]) ⇒ Iterator[(VertexId, A)], mergeMsg: (A, A) ⇒ A)(implicit arg0: ClassTag[A]): Graph[VD, ED]

Execute a Pregel-like iterative vertex-parallel abstraction.

def removeSelfEdges(): Graph[VD, ED]

Remove self edges.

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

Run PageRank for a fixed number of iterations returning a graph with vertex attributes containing the PageRank and edge attributes the normalized edge weight.

def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int, resetProb: Double = 0.15): Graph[Vector, Double]

Run parallel personalized PageRank for a given array of source vertices, such that all random walks are started relative to the source vertices

def staticPersonalizedPageRank(src: VertexId, numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

Run Personalized PageRank for a fixed number of iterations with with all iterations originating at the source node returning a graph with vertex attributes containing the PageRank and edge attributes the normalized edge weight.

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

Compute the strongly connected component (SCC) of each vertex and return a graph with the vertex value containing the lowest vertex id in the SCC containing that vertex.

def triangleCount(): Graph[Int, ED]

Compute the number of triangles passing through each vertex.

Example:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

val graph_operation=new GraphOps(graph)
val direction: EdgeDirection = EdgeDirection.Out
graph_operation.collectNeighborIds(direction).take(5).foreach(println)

/*
Output:
(4,[J@646df199)
(0,[J@4641c01)
(2,[J@1f267828)
(3,[J@4a55a32)
(7,[J@fcf9206)
graph_operation: org.apache.spark.graphx.GraphOps[(String, String),String] = org.apache.spark.graphx.GraphOps@2bef03ea
direction: org.apache.spark.graphx.EdgeDirection = EdgeDirection.Out

*/

Reference:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Last updated