Spark Graph Computing

Introduction:

Spark GraphX ​​is a distributed graph processing framework. It is based on the Spark platform and provides a simple, easy-to-use and rich interface for graph computing and graph mining, which greatly facilitates the demand for distributed graph processing.

It is well known that there are many relationship chains between people or entities in social networks, most notablly in well known social network companies that I do not name. The network structure relationships in the data require graph computations.

GraphX ​​is a new Spark API, which is used for the calculation of graphs and distributed graphs (graph-parallellism). GraphX ​​extends Spark RDD by introducing a Resilient Distributed Property Graph, which is a directed multiple graph with vertices and edges that have attributes.

To support graph calculations, GraphX ​​has developed a basic set of functional operations and an optimized Pregel API. In addition, GraphX ​​also includes a fast-growing collection of graph algorithms and graph builders to simplify graph analysis tasks.

High Level Abstract Diagram of Apach Spark

The abstraction of GraphX is an elastic distributed attribute graph, which is a directed multigraph with user-defined objects connected to each vertex and edge. Multiple parallel edges in a directed multigraph share the same source and destination vertices. The ability to support parallel edges simplifies the modeling scenario, and there may be multiple relationships (such as co-worker and friend) for the same vertex. Each vertex uses a unique 64-bit identifier (VertexID) as the key. Similarly, edges have corresponding source and destination vertex identifiers.

Data-Parallel vs Graph-Parallel computing

Data-parallel computation focuses on records, while graph-parallel focuses on vertices. Data-parallel processes independent data simultaneously, while graph-parallel does parallelism that by partitioning (i.e., slicing) the graph data. More precisely, data-parallel recursively defines the transformation functions on neighbor features, and concurrently in parallel executes these transformation functions.

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

Abstraction of Graphx:

Vertex RDD[(VertexId, VD)]

Edge RDD[Edge(ED)]

Triplet RDD[EdgeTriplet[VD,ED,VD]]

Graph Graph[VD, ED]

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:

import org.apache.spark.graphx.Graph
class VertexProperty()
case class UserProperty(val name: String) 
    extends VertexProperty
case class 
  ProductProperty(val name: String, val price: Double) 
    extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.

Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.graphx.impl._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}

abstract class Graph[VD: ClassTag, ED: ClassTag] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexId, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations.

Key Abstraction of Graphx:

Vertex       RDD[(VertexId, VD)]
Optimized    VertexRDD[VD]
Edge         RDD[Edge(ED)]
Optimzied    EdgeRDD[ED]
Triplet      RDD[EdgeTriplet[VD,ED,VD]]
Graph        Graph[VD, ED]

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

Source code of VertexRDD and EdgeRDD:

import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContext, SparkException}
import org.apache.spark.rdd.RDD

abstract class VertexRDD[VD]
  (sc: SparkContext, deps: Seq[Dependency[_]]) 
  extends RDD[(org.apache.spark.graphx.VertexId, VD)](sc, deps)

abstract class EdgeRDD[ED]
(sc: SparkContext, deps: Seq[Dependency[_]]) 
  extends RDD[org.apache.spark.graphx.Edge[ED]](sc, deps)

Additionally:

All operations on the Graph will be converted to RDD operations of its associated Table view to complete. You can say a graph computation is really a series of RDD conversion processes. Therefore, Graph has what comes with the RDD: Immutable, Distributed, and Fault-Tolerant. This simply means converting from one graph to a new graph is really creating a new graph, because, like RDD is immutable, so is Graph.

Graph Computing:

Example Property Graph

Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:

Resulting Graph should have following signature:

usergraph[(String,String),String]

VD=(String,String)

ED=String

Step 1: Import necessary packages

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx
  .{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

Step 2: Unless you use spark-shell or Jupyter-notebook Spylon Kernel, you are likely needing to create Spark context.

val spark = SparkSession
    .builder
    .appName("graphx app")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()

import spark.implicits._

//and SparkContext sc
val sc = spark.sparkContext

There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:

 // Assume the SparkContext has already been constructed
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

Step 3: Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.

We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

// Count all users which are postdocs
val verticeCount=graph.vertices
.filter { case (id, (name, pos)) => pos == "postdoc" }
.count
println(verticeCount)

and

// Count all the edges where src > dst
val edgeCount=graph.edges
.filter(e => e.srcId > e.dstId).count
println(edgeCount)

Please note:

graph.vertices returns a VertexRDD[(String,String)]

graph.vertices
/*
res3: org.apache.spark.graphx.VertexRDD[(String, String)] = VertexRDDImpl[11] at RDD at VertexRDD.scala:57
*/

graph.edges returns an EdgeRDD[String]

graph.edges
/*
res4: org.apache.spark.graphx.EdgeRDD[String] = EdgeRDDImpl[13] at RDD at EdgeRDD.scala:41
*/

Step 4: In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:

-- do it by SQL on PostgreSQL

-- Since attr field of vertices in this example 
-- is (String,String)
-- Define complex type graphx_vertice_type1

CREATE TYPE graphx_vertice_type1 AS (
    name            text,
    title           text
);

--create table vertices, notice column attr has 
--type graphx_vertice_type1
--created earlier

CREATE TABLE public.vertices
(
     id bigint,
     attr graphx_vertice_type1
)
TABLESPACE spark_tbs;

-- show table vertices created

dv6=# \d public.vertices

          Table "public.vertices"
 Column |         Type         | Modifiers
--------+----------------------+-----------
 id     | bigint               |
 attr   | graphx_vertice_type1 |

Tablespace: "spark_tbs"

-- Now populate vertices tables with vertex 
-- information

dv6=# insert into vertices values 
(3, ROW('rxin','student')),
(7, ROW('jgonzal', 'postdoc')),
(5, ROW('franklin','prof')),
(2, ROW('istoica','prof'));

-- list vertices table rows

dv6=# select * from vertices;
 id |       attr
----+-------------------
  3 | (rxin,student)
  7 | (jgonzal,postdoc)
  5 | (franklin,prof)
  2 | (istoica,prof)
(4 rows)

-- create edges table

CREATE TABLE public.edges
(
     srcId bigint,
     dstId bigint,
     attr text
)
TABLESPACE spark_tbs;

-- insert edge information into table edges

insert into edges values 
(3,7,'collab'),
(5,3,'advisor'),
(2,5,'colleague'),
(5,7,'pi');

-- show edges table rows

dv6=# select * from edges;
 srcid | dstid |   attr
-------+-------+-----------
     3 |     7 | collab
     5 |     3 | advisor
     2 |     5 | colleague
     5 |     7 | pi
(4 rows)

-- Now construct SQL query for triplet

SELECT src.id src_id, src.attr src_attr, 
e.attr,dst.id dst_id , dst.attr dst_attr
FROM edges AS e LEFT JOIN vertices AS src
ON e.srcId = src.Id
LEFT JOIN vertices AS dst
ON e.dstId = dst.Id;

 src_id |    src_attr     |   attr    | dst_id |     dst_attr
--------+-----------------+-----------+--------+-------------------
      5 | (franklin,prof) | advisor   |      3 | (rxin,student)
      2 | (istoica,prof)  | colleague |      5 | (franklin,prof)
      3 | (rxin,student)  | collab    |      7 | (jgonzal,postdoc)
      5 | (franklin,prof) | pi        |      7 | (jgonzal,postdoc)
(4 rows)


Graphically,

The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.

graph.triplets.map(triplet =>
triplet.srcAttr._1 + 
" is the " + triplet.attr + " of " + 
triplet.dstAttr._1).foreach(println)
    
/*
Output:
rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the pi of jgonzal
*/

Triplet component of a Graph describes the relationships amongst vertices and edges.

The greatness of triplet is one of the most attractive features coming from Spark's Graphx. It is automatically created for you anytime a new vertices and/or new edges in the Graph is created. You notice I used word create not modify, because vertices, edges, triplets even Graph is immutable. "Modifying" any one of them means creating new ones all together.

Use SQL to infer from the triplet query shown earlier:

Note: vertices and edges in database tables are mutable by insert, update and delete. RDDs such as vertices and edges are immutable, "modifying" a RDD means creating a new RDD, original RDD is not changed.

with x as (SELECT src.id src_id, src.attr src_attr, 
e.attr,dst.id dst_id , dst.attr dst_attr
FROM edges AS e LEFT JOIN vertices AS src
ON e.srcId = src.Id
LEFT JOIN vertices AS dst
ON e.dstId = dst.Id) 
select (src_attr).name || ' is ' || attr || ' of ' || (dst_attr).name inference_from_triplet
from x;

      inference_from_triplet
----------------------------------
 franklin is advisor of rxin
 istoica is colleague of franklin
 rxin is collab of jgonzal
 franklin is pi of jgonzal
(4 rows)

graph.triplets returns RDD EdgeTriplet[(String, String),String]

graph.triplets
/*
Output:
res8: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[26] at mapPartitions at GraphImpl.scala:48
*/

Graph distributions and storage

Graph storage in general

If the graph is too large to be computed on a single computer, it needs to be separated or cut into pieces and store each piece on each computer

For example, a social media company can have millions of people, hundreds millions of "friends" amongst people, then you have a graph with millions of vertices and hundreds of millions of edges. This means, the graph has to be split into lots of partitions runs on lots of worker nodes (computers).

Graph edge cut vs vertex cut

Two way to divide a graph into multiple portions of it.

As name suggests, Edge Cut means cutting the graph by edge.

Each vertex is stored once, but some edges will be interrupted and divided into two machines. The advantage of this is to save storage space; the disadvantage is that when performing edge-based calculations on the graph, for an edge where two vertices are divided into different machines, data needs to be transmitted across machines, and large network traffic would be expected.

Also as name suggests, Vertex Cut means cutting the graph on the vertex.

Each edge is stored only once and will only appear on one machine. Points with many neighbors will be replicated to multiple machines, increasing storage overhead and causing data synchronization issues. The advantage is that it can significantly reduce network traffic when comparing Edge Cut.

Vertex Cut only approach

Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with the Graph.partitionBy operator. The default partitioning strategy is to use the initial partitioning of the edges as provided on graph construction. However, users can easily switch to 2D-partitioning or other heuristics included in GraphX.

Graphx physical storage design

Graphx uses three RDDs to store graph data information across machines over the network:

VertexTable (id, data): id is the vertex id and data is the vertex attribute

EdgeTable (pid, src, dst, data): pid is the partition id, src is the source vertex id, dst is the destination vertex id, and data is the edge attribute

RoutingTable (id, pid): id is the vertex id and pid is the partition id

Partition Strategy

Graph's distributed storage uses the partitionBy method, and the user specifies different partition strategies (PartitionStrategy) and number of partitions. The partitioning strategy assigns edges to each EdgePartition, vertices to each VertexPartition. There are currently 4 strategies

Based upon open source code:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala

EdgePartition1d

Assigns edges to partitions using only the source vertex ID, colocating edges with the same source.


  case object EdgePartition1D extends 
     PartitionStrategy {
    override def getPartition(src: VertexId, 
        dst: VertexId, numParts: PartitionID): 
          PartitionID = {
      val mixingPrime: VertexId = 1125899906842597L
  (math.abs(src * mixingPrime) % numParts).toInt
    }
  }

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition1D

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._

val graph: Graph[Double, Int] =
       GraphGenerators.logNormalGraph(sc, 
         numVertices = 12)
       .mapVertices( (id, _) => id.toDouble )
       .partitionBy(PartitionStrategy
          .EdgePartition1D,4)
       
graph.edges.count
/*
res282: Long = 74
*/

//each line is a partiton, you can see Edge() inside
graph.edges.mapPartitions{case (rows) => 
    {Iterator((rows.mkString))}}
        .collect.foreach(println)
        
 /*
Edge(0,0,1)Edge(0,1,1)Edge(0,3,1)Edge(0,7,1)Edge(0,10,1)Edge(4,3,1)Edge(4,7,1)Edge(4,7,1)Edge(8,1,1)Edge(8,5,1)Edge(8,7,1)Edge(8,10,1)Edge(8,11,1)Edge(8,11,1)
Edge(1,1,1)Edge(1,1,1)Edge(1,3,1)Edge(1,5,1)Edge(1,7,1)Edge(1,10,1)Edge(5,0,1)Edge(5,1,1)Edge(5,3,1)Edge(5,4,1)Edge(5,5,1)Edge(5,5,1)Edge(9,2,1)Edge(9,3,1)Edge(9,3,1)Edge(9,3,1)Edge(9,8,1)Edge(9,9,1)Edge(9,10,1)Edge(9,11,1)
Edge(2,0,1)Edge(2,2,1)Edge(2,3,1)Edge(2,4,1)Edge(2,6,1)Edge(2,7,1)Edge(2,8,1)Edge(2,8,1)Edge(2,11,1)Edge(2,11,1)Edge(2,11,1)Edge(6,6,1)Edge(6,8,1)Edge(6,11,1)Edge(10,0,1)Edge(10,2,1)Edge(10,2,1)Edge(10,8,1)Edge(10,9,1)Edge(10,9,1)Edge(10,10,1)Edge(10,11,1)
Edge(3,2,1)Edge(3,2,1)Edge(3,3,1)Edge(3,6,1)Edge(3,7,1)Edge(3,9,1)Edge(3,9,1)Edge(3,11,1)Edge(7,0,1)Edge(7,0,1)Edge(7,5,1)Edge(7,7,1)Edge(11,0,1)Edge(11,0,1)Edge(11,7,1)Edge(11,7,1)Edge(11,7,1)Edge(11,8,1)
 
 It appears not really balanced Edges in partition
 
 */
 

EdgePartition2d

This cut method uses both source vertex id and destination vertex id. It uses 2-dimensional sparse edge matrices to assign edges to different partitions, ensuring that the number of vertices backed up is limited at square root of number of partitions

case object EdgePartition2D extends 
   PartitionStrategy {
    override def getPartition(src: VertexId, 
       dst: VertexId, numParts: PartitionID): 
         PartitionID = {
      val ceilSqrtNumParts: PartitionID = 
        math.ceil(math.sqrt(numParts)).toInt
      val mixingPrime: VertexId = 1125899906842597L
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
// Use old method for perfect squared to ensure 
// we get same results
val col: PartitionID = 
(math.abs(src * mixingPrime) % ceilSqrtNumParts)
  .toInt
val row: PartitionID = 
(math.abs(dst * mixingPrime) % ceilSqrtNumParts)
 .toInt
(col * ceilSqrtNumParts + row) % numParts
} else 
{
// Otherwise use new method
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col = 
(math.abs(src * mixingPrime) % numParts / rows)
.toInt
val row = 
(math.abs(dst * mixingPrime) % 
 (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row

}
}
}

Here numParts represents the number of partitions. The implementation of this method is divided into two cases, that is, the number of partitions can be fully squared and not fully squared. When the number of partitions can be fully squared, use the following method:

if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) 
{
// Use old method for perfect squared to ensure we get same results
val col: PartitionID = 
(math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = 
(math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
 (col * ceilSqrtNumParts + row) % numParts

When the number of partitions cannot be fully square rooted:

val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col = 
(math.abs(src * mixingPrime) % numParts / rows)
.toInt
val row = 
(math.abs(dst * mixingPrime) 
 % (if (col < cols - 1) rows else lastColRows))
 .toInt
col * rows + row

For illustration:

Suppose a graph with 12 vertices to partition over 9 machines. Use the following sparse matrix representation:

       __________________________________
  v0   | P0 *     | P1       | P2       |
  v1   |  ****    |  *       |          |
  v2   |  ******* |   *      |  ****    |
  v3   |  *****   |  *  *    |          |
       ----------------------------------
  v4   | P3 *     | P4 ***   | P5 **  * |
  v5   |  *  *    |  *       |          |
  v6   |      	  |   *      |  ****    |
  v7   |  * * *   |  *  *    |          |
       ----------------------------------
  v8   | P6       | P7       | P8  *    |
  v9   |          |  *       |          |
  v10  |          |   *      |  *  *    |
  v11  | * <-E    |  ***     |    *     |
       ----------------------------------

The edge denoted by 'E' connects 'v11' with 'v1' and is assigned to processor (partition) P6. To get the processor number we divide the matrix into 'sqrt(numParts)' by 'sqrt(numParts)' blocks. Notice that edges adjacent to 'v11' can only be in the first column of blocks '(P0, P3, P6)' or the last row of blocks '(P6, P7, P8)'. As a consequence we can guarantee that 'v11' will need to be replicated to at most '2 * sqrt(numParts)' machines.

Notice that 'P0' has many edges and as a consequence this partitioning would lead to poor work balance. To improve balance we first multiply each vertex id by a large prime to shuffle the vertex locations.

When the number of partitions requested is not a perfect square we use a slightly different method where the last column can have a different number of rows than the others while still maintaining the same size per block.

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition2

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._


val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 12)
  .mapVertices( (id, _) => id.toDouble )
  .partitionBy(PartitionStrategy.EdgePartition2D,4)
  
graph.edges.count
/*
res284: Long = 68
*/

graph.edges.mapPartitions{case (rows) => 
{Iterator((rows.mkString))}}.collect
.foreach(println)

/*
Edge(0,0,1)Edge(0,2,1)Edge(0,2,1)Edge(0,6,1)Edge(0,10,1)Edge(2,4,1)Edge(2,8,1)Edge(4,0,1)Edge(4,4,1)Edge(4,6,1)Edge(4,10,1)Edge(6,8,1)Edge(8,0,1)Edge(8,6,1)Edge(8,10,1)Edge(10,2,1)Edge(10,6,1)Edge(10,8,1)Edge(10,10,1)
Edge(0,1,1)Edge(0,1,1)Edge(0,3,1)Edge(0,5,1)Edge(0,7,1)Edge(0,9,1)Edge(2,1,1)Edge(2,3,1)Edge(2,9,1)Edge(4,1,1)Edge(4,3,1)Edge(4,5,1)Edge(4,5,1)Edge(4,5,1)Edge(4,9,1)Edge(4,11,1)Edge(6,11,1)Edge(8,1,1)Edge(8,7,1)Edge(10,1,1)Edge(10,3,1)Edge(10,5,1)Edge(10,7,1)Edge(10,9,1)Edge(10,11,1)
Edge(1,4,1)Edge(1,8,1)Edge(1,8,1)Edge(3,0,1)Edge(3,0,1)Edge(5,0,1)Edge(5,2,1)Edge(5,4,1)Edge(7,0,1)Edge(7,2,1)Edge(7,4,1)Edge(9,0,1)
Edge(1,5,1)Edge(3,9,1)Edge(5,3,1)Edge(5,7,1)Edge(5,11,1)Edge(5,11,1)Edge(7,3,1)Edge(9,5,1)Edge(9,9,1)Edge(9,11,1)Edge(11,7,1)Edge(11,9,1)
*/

RandomVertexCut:

This partitioning strategy is simple: Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices

case object RandomVertexCut extends 
PartitionStrategy {
override def getPartition(src: VertexId, 
dst: VertexId, 
numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
    }
  }

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is RandomVertexCut

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._


val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 12)
  .mapVertices( (id, _) => id.toDouble )
  .partitionBy(PartitionStrategy.RandomVertexCut,4)
  
  graph.edges.count
  /*
  res286: Long = 80
  */
  
  graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)
  
  /*
Edge(0,10,1)Edge(1,10,1)Edge(1,10,1)Edge(2,1,1)Edge(2,6,1)Edge(4,10,1)Edge(4,10,1)Edge(5,7,1)Edge(6,3,1)Edge(6,6,1)Edge(6,6,1)Edge(6,6,1)Edge(6,10,1)Edge(6,11,1)Edge(7,7,1)Edge(9,9,1)Edge(10,2,1)Edge(10,10,1)Edge(10,10,1)Edge(10,10,1)Edge(11,2,1)Edge(11,2,1)Edge(11,7,1)Edge(11,11,1)
Edge(1,1,1)Edge(4,5,1)Edge(4,7,1)Edge(4,9,1)Edge(5,3,1)Edge(6,4,1)Edge(6,9,1)Edge(7,1,1)Edge(7,2,1)Edge(7,2,1)Edge(7,5,1)Edge(7,5,1)Edge(7,5,1)Edge(8,4,1)Edge(8,8,1)Edge(8,10,1)
Edge(1,4,1)Edge(2,7,1)Edge(3,0,1)Edge(4,11,1)Edge(5,1,1)Edge(6,1,1)Edge(6,8,1)Edge(6,8,1)Edge(7,4,1)Edge(7,4,1)Edge(7,4,1)Edge(8,1,1)Edge(8,2,1)Edge(9,7,1)Edge(9,10,1)Edge(10,9,1)Edge(10,9,1)Edge(11,1,1)
Edge(0,7,1)Edge(2,2,1)Edge(2,5,1)Edge(2,5,1)Edge(2,5,1)Edge(4,1,1)Edge(4,2,1)Edge(4,4,1)Edge(4,4,1)Edge(4,6,1)Edge(5,0,1)Edge(5,10,1)Edge(7,6,1)Edge(8,11,1)Edge(10,6,1)Edge(10,6,1)Edge(10,8,1)Edge(10,8,1)Edge(11,4,1)Edge(11,9,1)Edge(11,10,1)Edge(11,10,1)
  */

CanonicalRandomVertexCut.

Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.

 case object CanonicalRandomVertexCut 
   extends PartitionStrategy {
    override def getPartition(src: VertexId, 
      dst: VertexId, numParts: PartitionID): PartitionID = {
      if (src < dst) {
        math.abs((src, dst).hashCode()) % numParts
      } else {
        math.abs((dst, src).hashCode()) % numParts
      }
    }
  }

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is CononicalRandomVertexCut

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._


val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 12)
  .mapVertices( (id, _) => id.toDouble )
  .partitionBy(PartitionStrategy.CanonicalRandomVertexCut,4)
  
  graph.edges.count
  /*
  res289: Long = 89
  */
  
  graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)
  
  /*
Edge(0,4,1)Edge(0,9,1)Edge(0,9,1)Edge(1,10,1)Edge(1,10,1)Edge(2,11,1)Edge(4,0,1)Edge(4,10,1)Edge(5,0,1)Edge(6,2,1)Edge(6,5,1)Edge(6,11,1)Edge(7,7,1)Edge(7,7,1)Edge(7,7,1)Edge(9,0,1)Edge(9,8,1)Edge(9,8,1)Edge(9,9,1)Edge(10,1,1)Edge(10,4,1)Edge(10,4,1)Edge(10,4,1)Edge(11,11,1)
Edge(0,2,1)Edge(0,8,1)Edge(0,8,1)Edge(1,6,1)Edge(1,6,1)Edge(3,1,1)Edge(3,1,1)Edge(3,1,1)Edge(3,5,1)Edge(3,5,1)Edge(4,2,1)Edge(4,5,1)Edge(4,7,1)Edge(4,8,1)Edge(5,3,1)Edge(6,1,1)Edge(6,9,1)Edge(8,0,1)Edge(8,0,1)Edge(9,4,1)Edge(9,6,1)Edge(9,7,1)Edge(10,8,1)Edge(11,9,1)Edge(11,9,1)
Edge(0,6,1)Edge(0,6,1)Edge(1,4,1)Edge(1,8,1)Edge(2,3,1)Edge(2,7,1)Edge(2,9,1)Edge(3,7,1)Edge(3,7,1)Edge(4,1,1)Edge(8,6,1)Edge(9,2,1)Edge(10,7,1)Edge(10,11,1)Edge(11,4,1)Edge(11,10,1)Edge(11,10,1)
Edge(0,11,1)Edge(1,9,1)Edge(1,11,1)Edge(1,11,1)Edge(2,2,1)Edge(2,5,1)Edge(3,8,1)Edge(3,9,1)Edge(3,10,1)Edge(5,2,1)Edge(5,9,1)Edge(5,11,1)Edge(6,4,1)Edge(6,4,1)Edge(6,7,1)Edge(7,1,1)Edge(7,1,1)Edge(8,3,1)Edge(8,5,1)Edge(9,3,1)Edge(11,0,1)Edge(11,1,1)Edge(11,5,1)
  */

Graph Creation

Graph creation methods

fromEdges

Build graph from edges

def fromEdges[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultValue: VD,
edgeStorageLevel: StorageLevel = 
    StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = 
  StorageLevel.MEMORY_ONLY): 
    Graph[VD, ED] = {
     GraphImpl(edges, defaultValue, 
  edgeStorageLevel, vertexStorageLevel)
  }

fromEdgeTuples

Build graph from connected vertices

def fromEdgeTuples[VD: ClassTag](
    rawEdges: RDD[(VertexId, VertexId)],
    defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = 
  StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = 
  StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
  {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, 
   edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
  case Some(p) => graph.partitionBy(p)
    .groupEdges((a, b) => a + b)
case None => graph
    }
  }

You notice both Graph creation methods are through apply method of class GraphImpl.

Example

Assume the data file looks like below:

cat followers.txt
3 6
3 5
3 4
3 2
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

Build a graph using edgeListFile method of GraphLoader Class

import org.apache.spark.graphx.GraphLoader
val followerGraph = GraphLoader
  .edgeListFile(sc, "followers.txt")
followerGraph.triplets.foreach(println)
/*
((3,1),(7,1),1)
((6,1),(3,1),1)
((6,1),(7,1),1)
((7,1),(3,1),1)
((7,1),(6,1),1)
((1,1),(2,1),1)
((2,1),(1,1),1)
((3,1),(2,1),1)
((3,1),(4,1),1)
((3,1),(5,1),1)
((3,1),(6,1),1)
((4,1),(1,1),1)

*/

followerGraph.vertices.foreach(println)
/*
(1,1)
(3,1)
(4,1)
(6,1)
(2,1)
(7,1)
(5,1)
*/

followerGraph.edges.foreach(println)
/*
Edge(1,2,1)
Edge(2,1,1)
Edge(3,2,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(3,6,1)
Edge(4,1,1)
Edge(3,7,1)
Edge(6,3,1)
Edge(6,7,1)
Edge(7,3,1)
Edge(7,6,1)
*/

//Notice both vertex attribute and 
//edge attribute are set to 1
//by edgeListFile method

If want to know why vertex attribute and edge attribute are set to 1, here is the open source of method edgeListFile

  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      numEdgePartitions: Int = -1,
      edgeStorageLevel: StorageLevel = 
        StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = 
        StorageLevel.MEMORY_ONLY)
    : Graph[Int, Int] =
  {
    val startTimeNs = System.nanoTime()

    // Parse the edge data table directly into 
    // edge partitions
    val lines =
      if (numEdgePartitions > 0) {
        sc.textFile(path, numEdgePartitions)
          .coalesce(numEdgePartitions)
      } else {
        sc.textFile(path)
      }
    val edges = lines.mapPartitionsWithIndex { 
      (pid, iter) =>
  val builder = new EdgePartitionBuilder[Int, Int]
      iter.foreach { line =>
        if (!line.isEmpty && line(0) != '#') {
          val lineArray = line.split("\\s+")
          if (lineArray.length < 2) {
throw new IllegalArgumentException("Invalid line: " + line)
          }
          val srcId = lineArray(0).toLong
          val dstId = lineArray(1).toLong
      if (canonicalOrientation && srcId > dstId) {
            builder.add(dstId, srcId, 1)
          } else {
            builder.add(srcId, dstId, 1)
          }
        }
      }
      Iterator((pid, builder.toEdgePartition))
    }.persist(edgeStorageLevel)
     .setName("GraphLoader.edgeListFile - edges (%s)".format(path))
    edges.count()

logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" +
      " to load the edges")

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
      vertexStorageLevel = vertexStorageLevel)
  } // end of edgeListFile

Graph computing is about converting one graph by manipulating attributes of veritices and edges to make a new graph

Use method fromEdgeTuples of Graph class to create a new with different vertex attributes, say 2 from 1 across board

val followerGraph2=Graph.fromEdgeTuples(followerGraph.triplets.map(x=>(x.srcId,x.dstId)),2)
followerGraph2.vertices.foreach(println)

/*
(1,2)
(3,2)
(7,2)
(5,2)
(4,2)
(6,2)
(2,2)
*/

We can also change the edges attribute from 1 to 3, across the board for example, using fromEdges method of EdgeRDD class:

val edges = EdgeRDD.
  fromEdges(followerGraph2.edges.toDF.rdd
.map(row=> 
Edge(row.getAs[Long]("srcId"), row.getAs[Long]
("dstId"), 3)))
edges.foreach(println)

/*
Edge(1,2,3)
Edge(2,1,3)
Edge(3,2,3)
Edge(3,4,3)
Edge(3,5,3)
Edge(3,6,3)
Edge(4,1,3)
Edge(3,7,3)
Edge(6,3,3)
Edge(6,7,3)
Edge(7,3,3)
Edge(7,6,3)
*/

Last updated