Spark Graph Computing

Introduction:

Spark GraphX ​​is a distributed graph processing framework. It is based on the Spark platform and provides a simple, easy-to-use and rich interface for graph computing and graph mining, which greatly facilitates the demand for distributed graph processing.
It is well known that there are many relationship chains between people or entities in social networks, most notablly in well known social network companies that I do not name. The network structure relationships in the data require graph computations.
GraphX ​​is a new Spark API, which is used for the calculation of graphs and distributed graphs (graph-parallellism). GraphX ​​extends Spark RDD by introducing a Resilient Distributed Property Graph, which is a directed multiple graph with vertices and edges that have attributes.
To support graph calculations, GraphX ​​has developed a basic set of functional operations and an optimized Pregel API. In addition, GraphX ​​also includes a fast-growing collection of graph algorithms and graph builders to simplify graph analysis tasks.

High Level Abstract Diagram of Apach Spark

The abstraction of GraphX is an elastic distributed attribute graph, which is a directed multigraph with user-defined objects connected to each vertex and edge. Multiple parallel edges in a directed multigraph share the same source and destination vertices. The ability to support parallel edges simplifies the modeling scenario, and there may be multiple relationships (such as co-worker and friend) for the same vertex. Each vertex uses a unique 64-bit identifier (VertexID) as the key. Similarly, edges have corresponding source and destination vertex identifiers.

Data-Parallel vs Graph-Parallel computing

Data-parallel computation focuses on records, while graph-parallel focuses on vertices. Data-parallel processes independent data simultaneously, while graph-parallel does parallelism that by partitioning (i.e., slicing) the graph data. More precisely, data-parallel recursively defines the transformation functions on neighbor features, and concurrently in parallel executes these transformation functions.
GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

Abstraction of Graphx:

Vertex RDD[(VertexId, VD)]
Edge RDD[Edge(ED)]
Triplet RDD[EdgeTriplet[VD,ED,VD]]
Graph Graph[VD, ED]
Table View is in the form of VertexRDD and EdgeRDD
Graph View is in the form of Graph
In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:
1
import org.apache.spark.graphx.Graph
2
class VertexProperty()
3
case class UserProperty(val name: String)
4
extends VertexProperty
5
case class
6
ProductProperty(val name: String, val price: Double)
7
extends VertexProperty
8
// The graph might then have the type:
9
var graph: Graph[VertexProperty, String] = null
Copied!
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:
1
import scala.reflect.ClassTag
2
​
3
import org.apache.spark._
4
import org.apache.spark.graphx.impl._
5
import org.apache.spark.rdd._
6
import org.apache.spark.storage.StorageLevel
7
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}
8
​
9
abstract class Graph[VD: ClassTag, ED: ClassTag] {
10
val vertices: VertexRDD[VD]
11
val edges: EdgeRDD[ED]
12
}
Copied!
The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexId, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations.
Key Abstraction of Graphx:
1
Vertex RDD[(VertexId, VD)]
2
Optimized VertexRDD[VD]
3
Edge RDD[Edge(ED)]
4
Optimzied EdgeRDD[ED]
5
Triplet RDD[EdgeTriplet[VD,ED,VD]]
6
Graph Graph[VD, ED]
Copied!
Table View is in the form of VertexRDD and EdgeRDD
Graph View is in the form of Graph
Source code of VertexRDD and EdgeRDD:
1
import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContext, SparkException}
2
import org.apache.spark.rdd.RDD
3
​
4
abstract class VertexRDD[VD]
5
(sc: SparkContext, deps: Seq[Dependency[_]])
6
extends RDD[(org.apache.spark.graphx.VertexId, VD)](sc, deps)
7
​
8
abstract class EdgeRDD[ED]
9
(sc: SparkContext, deps: Seq[Dependency[_]])
10
extends RDD[org.apache.spark.graphx.Edge[ED]](sc, deps)
Copied!
Additionally:
All operations on the Graph will be converted to RDD operations of its associated Table view to complete. You can say a graph computation is really a series of RDD conversion processes. Therefore, Graph has what comes with the RDD: Immutable, Distributed, and Fault-Tolerant. This simply means converting from one graph to a new graph is really creating a new graph, because, like RDD is immutable, so is Graph.

Graph Computing:

Example Property Graph
Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:
Resulting Graph should have following signature:
usergraph[(String,String),String]
VD=(String,String)
ED=String
Step 1: Import necessary packages
1
import org.apache.spark._
2
import org.apache.spark.graphx._
3
import org.apache.spark.rdd.RDD
4
import org.apache.spark.graphx
5
.{Edge, Graph, VertexId}
6
import org.apache.spark.rdd.RDD
7
import org.apache.spark.{SparkConf, SparkContext}
Copied!
Step 2: Unless you use spark-shell or Jupyter-notebook Spylon Kernel, you are likely needing to create Spark context.
1
val spark = SparkSession
2
.builder
3
.appName("graphx app")
4
.master("local[*]")
5
.config("spark.sql.warehouse.dir", "file:///tmp")
6
.getOrCreate()
7
​
8
import spark.implicits._
9
​
10
//and SparkContext sc
11
val sc = spark.sparkContext
Copied!
There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:
1
// Assume the SparkContext has already been constructed
2
// Create an RDD for the vertices
3
val users: RDD[(VertexId, (String, String))] =
4
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
5
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
6
// Create an RDD for edges
7
val relationships: RDD[Edge[String]] =
8
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
9
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
10
// Define a default user in case there are relationship with missing user
11
val defaultUser = ("John Doe", "Missing")
12
// Build the initial Graph
13
val graph = Graph(users, relationships, defaultUser)
Copied!
Step 3: Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.
We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.
1
// Count all users which are postdocs
2
val verticeCount=graph.vertices
3
.filter { case (id, (name, pos)) => pos == "postdoc" }
4
.count
5
println(verticeCount)
6
​
Copied!
and
1
// Count all the edges where src > dst
2
val edgeCount=graph.edges
3
.filter(e => e.srcId > e.dstId).count
4
println(edgeCount)
Copied!
Please note:
graph.vertices returns a VertexRDD[(String,String)]
1
graph.vertices
2
/*
3
res3: org.apache.spark.graphx.VertexRDD[(String, String)] = VertexRDDImpl[11] at RDD at VertexRDD.scala:57
4
*/
Copied!
graph.edges returns an EdgeRDD[String]
1
graph.edges
2
/*
3
res4: org.apache.spark.graphx.EdgeRDD[String] = EdgeRDDImpl[13] at RDD at EdgeRDD.scala:41
4
*/
Copied!
Step 4: In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:
1
-- do it by SQL on PostgreSQL
2
​
3
-- Since attr field of vertices in this example
4
-- is (String,String)
5
-- Define complex type graphx_vertice_type1
6
​
7
CREATE TYPE graphx_vertice_type1 AS (
8
name text,
9
title text
10
);
11
​
12
--create table vertices, notice column attr has
13
--type graphx_vertice_type1
14
--created earlier
15
​
16
CREATE TABLE public.vertices
17
(
18
id bigint,
19
attr graphx_vertice_type1
20
)
21
TABLESPACE spark_tbs;
22
​
23
-- show table vertices created
24
​
25
dv6=# \d public.vertices
26
​
27
Table "public.vertices"
28
Column | Type | Modifiers
29
--------+----------------------+-----------
30
id | bigint |
31
attr | graphx_vertice_type1 |
32
​
33
Tablespace: "spark_tbs"
34
​
35
-- Now populate vertices tables with vertex
36
-- information
37
​
38
dv6=# insert into vertices values
39
(3, ROW('rxin','student')),
40
(7, ROW('jgonzal', 'postdoc')),
41
(5, ROW('franklin','prof')),
42
(2, ROW('istoica','prof'));
43
​
44
-- list vertices table rows
45
​
46
dv6=# select * from vertices;
47
id | attr
48
----+-------------------
49
3 | (rxin,student)
50
7 | (jgonzal,postdoc)
51
5 | (franklin,prof)
52
2 | (istoica,prof)
53
(4 rows)
54
​
55
-- create edges table
56
​
57
CREATE TABLE public.edges
58
(
59
srcId bigint,
60
dstId bigint,
61
attr text
62
)
63
TABLESPACE spark_tbs;
64
​
65
-- insert edge information into table edges
66
​
67
insert into edges values
68
(3,7,'collab'),
69
(5,3,'advisor'),
70
(2,5,'colleague'),
71
(5,7,'pi');
72
​
73
-- show edges table rows
74
​
75
dv6=# select * from edges;
76
srcid | dstid | attr
77
-------+-------+-----------
78
3 | 7 | collab
79
5 | 3 | advisor
80
2 | 5 | colleague
81
5 | 7 | pi
82
(4 rows)
83
​
84
-- Now construct SQL query for triplet
85
​
86
SELECT src.id src_id, src.attr src_attr,
87
e.attr,dst.id dst_id , dst.attr dst_attr
88
FROM edges AS e LEFT JOIN vertices AS src
89
ON e.srcId = src.Id
90
LEFT JOIN vertices AS dst
91
ON e.dstId = dst.Id;
92
​
93
src_id | src_attr | attr | dst_id | dst_attr
94
--------+-----------------+-----------+--------+-------------------
95
5 | (franklin,prof) | advisor | 3 | (rxin,student)
96
2 | (istoica,prof) | colleague | 5 | (franklin,prof)
97
3 | (rxin,student) | collab | 7 | (jgonzal,postdoc)
98
5 | (franklin,prof) | pi | 7 | (jgonzal,postdoc)
99
(4 rows)
100
​
101
​
102
​
Copied!
Graphically,
The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.
1
graph.triplets.map(triplet =>
2
triplet.srcAttr._1 +
3
" is the " + triplet.attr + " of " +
4
triplet.dstAttr._1).foreach(println)
5
6
/*
7
Output:
8
rxin is the collab of jgonzal
9
franklin is the advisor of rxin
10
istoica is the colleague of franklin
11
franklin is the pi of jgonzal
12
*/
Copied!
Triplet component of a Graph describes the relationships amongst vertices and edges.
The greatness of triplet is one of the most attractive features coming from Spark's Graphx. It is automatically created for you anytime a new vertices and/or new edges in the Graph is created. You notice I used word create not modify, because vertices, edges, triplets even Graph is immutable. "Modifying" any one of them means creating new ones all together.
Use SQL to infer from the triplet query shown earlier:
Note: vertices and edges in database tables are mutable by insert, update and delete. RDDs such as vertices and edges are immutable, "modifying" a RDD means creating a new RDD, original RDD is not changed.
1
with x as (SELECT src.id src_id, src.attr src_attr,
2
e.attr,dst.id dst_id , dst.attr dst_attr
3
FROM edges AS e LEFT JOIN vertices AS src
4
ON e.srcId = src.Id
5
LEFT JOIN vertices AS dst
6
ON e.dstId = dst.Id)
7
select (src_attr).name || ' is ' || attr || ' of ' || (dst_attr).name inference_from_triplet
8
from x;
9
​
10
inference_from_triplet
11
----------------------------------
12
franklin is advisor of rxin
13
istoica is colleague of franklin
14
rxin is collab of jgonzal
15
franklin is pi of jgonzal
16
(4 rows)
17
​
Copied!
graph.triplets returns RDD EdgeTriplet[(String, String),String]
1
graph.triplets
2
/*
3
Output:
4
res8: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[26] at mapPartitions at GraphImpl.scala:48
5
*/
Copied!

Graph distributions and storage

Graph storage in general

If the graph is too large to be computed on a single computer, it needs to be separated or cut into pieces and store each piece on each computer
For example, a social media company can have millions of people, hundreds millions of "friends" amongst people, then you have a graph with millions of vertices and hundreds of millions of edges. This means, the graph has to be split into lots of partitions runs on lots of worker nodes (computers).

Graph edge cut vs vertex cut

Two way to divide a graph into multiple portions of it.

As name suggests, Edge Cut means cutting the graph by edge.

Each vertex is stored once, but some edges will be interrupted and divided into two machines. The advantage of this is to save storage space; the disadvantage is that when performing edge-based calculations on the graph, for an edge where two vertices are divided into different machines, data needs to be transmitted across machines, and large network traffic would be expected.

Also as name suggests, Vertex Cut means cutting the graph on the vertex.

Each edge is stored only once and will only appear on one machine. Points with many neighbors will be replicated to multiple machines, increasing storage overhead and causing data synchronization issues. The advantage is that it can significantly reduce network traffic when comparing Edge Cut.

Vertex Cut only approach

Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with the Graph.partitionBy operator. The default partitioning strategy is to use the initial partitioning of the edges as provided on graph construction. However, users can easily switch to 2D-partitioning or other heuristics included in GraphX.

Graphx physical storage design

Graphx uses three RDDs to store graph data information across machines over the network:
VertexTable (id, data): id is the vertex id and data is the vertex attribute
EdgeTable (pid, src, dst, data): pid is the partition id, src is the source vertex id, dst is the destination vertex id, and data is the edge attribute
RoutingTable (id, pid): id is the vertex id and pid is the partition id

Partition Strategy

Graph's distributed storage uses the partitionBy method, and the user specifies different partition strategies (PartitionStrategy) and number of partitions. The partitioning strategy assigns edges to each EdgePartition, vertices to each VertexPartition. There are currently 4 strategies
Based upon open source code:
EdgePartition1d
Assigns edges to partitions using only the source vertex ID, colocating edges with the same source.
1
​
2
case object EdgePartition1D extends
3
PartitionStrategy {
4
override def getPartition(src: VertexId,
5
dst: VertexId, numParts: PartitionID):
6
PartitionID = {
7
val mixingPrime: VertexId = 1125899906842597L
8
(math.abs(src * mixingPrime) % numParts).toInt
9
}
10
}
Copied!

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition1D
1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
val graph: Graph[Double, Int] =
7
GraphGenerators.logNormalGraph(sc,
8
numVertices = 12)
9
.mapVertices( (id, _) => id.toDouble )
10
.partitionBy(PartitionStrategy
11
.EdgePartition1D,4)
12
13
graph.edges.count
14
/*
15
res282: Long = 74
16
*/
17
​
18
//each line is a partiton, you can see Edge() inside
19
graph.edges.mapPartitions{case (rows) =>
20
{Iterator((rows.mkString))}}
21
.collect.foreach(println)
22
23
/*
24
Edge(0,0,1)Edge(0,1,1)Edge(0,3,1)Edge(0,7,1)Edge(0,10,1)Edge(4,3,1)Edge(4,7,1)Edge(4,7,1)Edge(8,1,1)Edge(8,5,1)Edge(8,7,1)Edge(8,10,1)Edge(8,11,1)Edge(8,11,1)
25
Edge(1,1,1)Edge(1,1,1)Edge(1,3,1)Edge(1,5,1)Edge(1,7,1)Edge(1,10,1)Edge(5,0,1)Edge(5,1,1)Edge(5,3,1)Edge(5,4,1)Edge(5,5,1)Edge(5,5,1)Edge(9,2,1)Edge(9,3,1)Edge(9,3,1)Edge(9,3,1)Edge(9,8,1)Edge(9,9,1)Edge(9,10,1)Edge(9,11,1)
26
Edge(2,0,1)Edge(2,2,1)Edge(2,3,1)Edge(2,4,1)Edge(2,6,1)Edge(2,7,1)Edge(2,8,1)Edge(2,8,1)Edge(2,11,1)Edge(2,11,1)Edge(2,11,1)Edge(6,6,1)Edge(6,8,1)Edge(6,11,1)Edge(10,0,1)Edge(10,2,1)Edge(10,2,1)Edge(10,8,1)Edge(10,9,1)Edge(10,9,1)Edge(10,10,1)Edge(10,11,1)
27
Edge(3,2,1)Edge(3,2,1)Edge(3,3,1)Edge(3,6,1)Edge(3,7,1)Edge(3,9,1)Edge(3,9,1)Edge(3,11,1)Edge(7,0,1)Edge(7,0,1)Edge(7,5,1)Edge(7,7,1)Edge(11,0,1)Edge(11,0,1)Edge(11,7,1)Edge(11,7,1)Edge(11,7,1)Edge(11,8,1)
28
29
It appears not really balanced Edges in partition
30
31
*/
32
Copied!
EdgePartition2d
This cut method uses both source vertex id and destination vertex id. It uses 2-dimensional sparse edge matrices to assign edges to different partitions, ensuring that the number of vertices backed up is limited at square root of number of partitions
1
case object EdgePartition2D extends
2
PartitionStrategy {
3
override def getPartition(src: VertexId,
4
dst: VertexId, numParts: PartitionID):
5
PartitionID = {
6
val ceilSqrtNumParts: PartitionID =
7
math.ceil(math.sqrt(numParts)).toInt
8
val mixingPrime: VertexId = 1125899906842597L
9
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
10
// Use old method for perfect squared to ensure
11
// we get same results
12
val col: PartitionID =
13
(math.abs(src * mixingPrime) % ceilSqrtNumParts)
14
.toInt
15
val row: PartitionID =
16
(math.abs(dst * mixingPrime) % ceilSqrtNumParts)
17
.toInt
18
(col * ceilSqrtNumParts + row) % numParts
19
} else
20
{
21
// Otherwise use new method
22
val cols = ceilSqrtNumParts
23
val rows = (numParts + cols - 1) / cols
24
val lastColRows = numParts - rows * (cols - 1)
25
val col =
26
(math.abs(src * mixingPrime) % numParts / rows)
27
.toInt
28
val row =
29
(math.abs(dst * mixingPrime) %
30
(if (col < cols - 1) rows else lastColRows)).toInt
31
col * rows + row
32
​
33
}
34
}
35
}
Copied!
Here numParts represents the number of partitions. The implementation of this method is divided into two cases, that is, the number of partitions can be fully squared and not fully squared. When the number of partitions can be fully squared, use the following method:
1
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts)
2
{
3
// Use old method for perfect squared to ensure we get same results
4
val col: PartitionID =
5
(math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
6
val row: PartitionID =
7
(math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
8
(col * ceilSqrtNumParts + row) % numParts
Copied!
When the number of partitions cannot be fully square rooted:
1
val cols = ceilSqrtNumParts
2
val rows = (numParts + cols - 1) / cols
3
val lastColRows = numParts - rows * (cols - 1)
4
val col =
5
(math.abs(src * mixingPrime) % numParts / rows)
6
.toInt
7
val row =
8
(math.abs(dst * mixingPrime)
9
% (if (col < cols - 1) rows else lastColRows))
10
.toInt
11
col * rows + row
Copied!
For illustration:
Suppose a graph with 12 vertices to partition over 9 machines. Use the following sparse matrix representation:
1
__________________________________
2
v0 | P0 * | P1 | P2 |
3
v1 | **** | * | |
4
v2 | ******* | * | **** |
5
v3 | ***** | * * | |
6
----------------------------------
7
v4 | P3 * | P4 *** | P5 ** * |
8
v5 | * * | * | |
9
v6 | | * | **** |
10
v7 | * * * | * * | |
11
----------------------------------
12
v8 | P6 | P7 | P8 * |
13
v9 | | * | |
14
v10 | | * | * * |
15
v11 | * <-E | *** | * |
16
----------------------------------
Copied!
The edge denoted by 'E' connects 'v11' with 'v1' and is assigned to processor (partition) P6. To get the processor number we divide the matrix into 'sqrt(numParts)' by 'sqrt(numParts)' blocks. Notice that edges adjacent to 'v11' can only be in the first column of blocks '(P0, P3, P6)' or the last row of blocks '(P6, P7, P8)'. As a consequence we can guarantee that 'v11' will need to be replicated to at most '2 * sqrt(numParts)' machines.
Notice that 'P0' has many edges and as a consequence this partitioning would lead to poor work balance. To improve balance we first multiply each vertex id by a large prime to shuffle the vertex locations.
When the number of partitions requested is not a perfect square we use a slightly different method where the last column can have a different number of rows than the others while still maintaining the same size per block.

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition2
1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
​
7
val graph: Graph[Double, Int] =
8
GraphGenerators.logNormalGraph(sc, numVertices = 12)
9
.mapVertices( (id, _) => id.toDouble )
10
.partitionBy(PartitionStrategy.EdgePartition2D,4)
11
12
graph.edges.count
13
/*
14
res284: Long = 68
15
*/
16
​
17
graph.edges.mapPartitions{case (rows) =>
18
{Iterator((rows.mkString))}}.collect
19
.foreach(println)
20
​
21
/*
22
Edge(0,0,1)Edge(0,2,1)Edge(0,2,1)Edge(0,6,1)Edge(0,10,1)Edge(2,4,1)Edge(2,8,1)Edge(4,0,1)Edge(4,4,1)Edge(4,6,1)Edge(4,10,1)Edge(6,8,1)Edge(8,0,1)Edge(8,6,1)Edge(8,10,1)Edge(10,2,1)Edge(10,6,1)Edge(10,8,1)Edge(10,10,1)
23
Edge(0,1,1)Edge(0,1,1)Edge(0,3,1)Edge(0,5,1)Edge(0,7,1)Edge(0,9,1)Edge(2,1,1)Edge(2,3,1)Edge(2,9,1)Edge(4,1,1)Edge(4,3,1)Edge(4,5,1)Edge(4,5,1)Edge(4,5,1)Edge(4,9,1)Edge(4,11,1)Edge(6,11,1)Edge(8,1,1)Edge(8,7,1)Edge(10,1,1)Edge(10,3,1)Edge(10,5,1)Edge(10,7,1)Edge(10,9,1)Edge(10,11,1)
24
Edge(1,4,1)Edge(1,8,1)Edge(1,8,1)Edge(3,0,1)Edge(3,0,1)Edge(5,0,1)Edge(5,2,1)Edge(5,4,1)Edge(7,0,1)Edge(7,2,1)Edge(7,4,1)Edge(9,0,1)
25
Edge(1,5,1)Edge(3,9,1)Edge(5,3,1)Edge(5,7,1)Edge(5,11,1)Edge(5,11,1)Edge(7,3,1)Edge(9,5,1)Edge(9,9,1)Edge(9,11,1)Edge(11,7,1)Edge(11,9,1)
26
*/
27
​
28
​
Copied!
RandomVertexCut:
This partitioning strategy is simple: Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices
1
case object RandomVertexCut extends
2
PartitionStrategy {
3
override def getPartition(src: VertexId,
4
dst: VertexId,
5
numParts: PartitionID): PartitionID = {
6
math.abs((src, dst).hashCode()) % numParts
7
}
8
}
Copied!

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is RandomVertexCut
1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
​
7
val graph: Graph[Double, Int] =
8
GraphGenerators.logNormalGraph(sc, numVertices = 12)
9
.mapVertices( (id, _) => id.toDouble )
10
.partitionBy(PartitionStrategy.RandomVertexCut,4)
11
12
graph.edges.count
13
/*
14
res286: Long = 80
15
*/
16
17
graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)
18
19
/*
20
Edge(0,10,1)Edge(1,10,1)Edge(1,10,1)Edge(2,1,1)Edge(2,6,1)Edge(4,10,1)Edge(4,10,1)Edge(5,7,1)Edge(6,3,1)Edge(6,6,1)Edge(6,6,1)Edge(6,6,1)Edge(6,10,1)Edge(6,11,1)Edge(7,7,1)Edge(9,9,1)Edge(10,2,1)Edge(10,10,1)Edge(10,10,1)Edge(10,10,1)Edge(11,2,1)Edge(11,2,1)Edge(11,7,1)Edge(11,11,1)
21
Edge(1,1,1)Edge(4,5,1)Edge(4,7,1)Edge(4,9,1)Edge(5,3,1)Edge(6,4,1)Edge(6,9,1)Edge(7,1,1)Edge(7,2,1)Edge(7,2,1)Edge(7,5,1)Edge(7,5,1)Edge(7,5,1)Edge(8,4,1)Edge(8,8,1)Edge(8,10,1)
22
Edge(1,4,1)Edge(2,7,1)Edge(3,0,1)Edge(4,11,1)Edge(5,1,1)Edge(6,1,1)Edge(6,8,1)Edge(6,8,1)Edge(7,4,1)Edge(7,4,1)Edge(7,4,1)Edge(8,1,1)Edge(8,2,1)Edge(9,7,1)Edge(9,10,1)Edge(10,9,1)Edge(10,9,1)Edge(11,1,1)
23
Edge(0,7,1)Edge(2,2,1)Edge(2,5,1)Edge(2,5,1)Edge(2,5,1)Edge(4,1,1)Edge(4,2,1)Edge(4,4,1)Edge(4,4,1)Edge(4,6,1)Edge(5,0,1)Edge(5,10,1)Edge(7,6,1)Edge(8,11,1)Edge(10,6,1)Edge(10,6,1)Edge(10,8,1)Edge(10,8,1)Edge(11,4,1)Edge(11,9,1)Edge(11,10,1)Edge(11,10,1)
24
*/
Copied!
CanonicalRandomVertexCut.
Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.
1
case object CanonicalRandomVertexCut
2
extends PartitionStrategy {
3
override def getPartition(src: VertexId,
4
dst: VertexId, numParts: PartitionID): PartitionID = {
5
if (src < dst) {
6
math.abs((src, dst).hashCode()) % numParts
7
} else {
8
math.abs((dst, src).hashCode()) % numParts
9
}
10
}
11
}
Copied!

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is CononicalRandomVertexCut
1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
​
7
val graph: Graph[Double, Int] =
8
GraphGenerators.logNormalGraph(sc, numVertices = 12)
9
.mapVertices( (id, _) => id.toDouble )
10
.partitionBy(PartitionStrategy.CanonicalRandomVertexCut,4)
11
12
graph.edges.count
13
/*
14
res289: Long = 89
15
*/
16
17
graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)
18
19
/*
20
Edge(0,4,1)Edge(0,9,1)Edge(0,9,1)Edge(1,10,1)Edge(1,10,1)Edge(2,11,1)Edge(4,0,1)Edge(4,10,1)Edge(5,0,1)Edge(6,2,1)Edge(6,5,1)Edge(6,11,1)Edge(7,7,1)Edge(7,7,1)Edge(7,7,1)Edge(9,0,1)Edge(9,8,1)Edge(9,8,1)Edge(9,9,1)Edge(10,1,1)Edge(10,4,1)Edge(10,4,1)Edge(10,4,1)Edge(11,11,1)
21
Edge(0,2,1)Edge(0,8,1)Edge(0,8,1)Edge(1,6,1)Edge(1,6,1)Edge(3,1,1)Edge(3,1,1)Edge(3,1,1)Edge(3,5,1)Edge(3,5,1)Edge(4,2,1)Edge(4,5,1)Edge(4,7,1)Edge(4,8,1)Edge(5,3,1)Edge(6,1,1)Edge(6,9,1)Edge(8,0,1)Edge(8,0,1)Edge(9,4,1)Edge(9,6,1)Edge(9,7,1)Edge(10,8,1)Edge(11,9,1)Edge(11,9,1)
22
Edge(0,6,1)Edge(0,6,1)Edge(1,4,1)Edge(1,8,1)Edge(2,3,1)Edge(2,7,1)Edge(2,9,1)Edge(3,7,1)Edge(3,7,1)Edge(4,1,1)Edge(8,6,1)Edge(9,2,1)Edge(10,7,1)Edge(10,11,1)Edge(11,4,1)Edge(11,10,1)Edge(11,10,1)
23
Edge(0,11,1)Edge(1,9,1)Edge(1,11,1)Edge(1,11,1)Edge(2,2,1)Edge(2,5,1)Edge(3,8,1)Edge(3,9,1)Edge(3,10,1)Edge(5,2,1)Edge(5,9,1)Edge(5,11,1)Edge(6,4,1)Edge(6,4,1)Edge(6,7,1)Edge(7,1,1)Edge(7,1,1)Edge(8,3,1)Edge(8,5,1)Edge(9,3,1)Edge(11,0,1)Edge(11,1,1)Edge(11,5,1)
24
*/
Copied!

Graph Creation

Graph creation methods

fromEdges
Build graph from edges
1
def fromEdges[VD: ClassTag, ED: ClassTag](
2
edges: RDD[Edge[ED]],
3
defaultValue: VD,
4
edgeStorageLevel: StorageLevel =
5
StorageLevel.MEMORY_ONLY,
6
vertexStorageLevel: StorageLevel =
7
StorageLevel.MEMORY_ONLY):
8
Graph[VD, ED] = {
9
GraphImpl(edges, defaultValue,
10
edgeStorageLevel, vertexStorageLevel)
11
}
12
​
13
​
Copied!
fromEdgeTuples
Build graph from connected vertices
1
def fromEdgeTuples[VD: ClassTag](
2
rawEdges: RDD[(VertexId, VertexId)],
3
defaultValue: VD,
4
uniqueEdges: Option[PartitionStrategy] = None,
5
edgeStorageLevel: StorageLevel =
6
StorageLevel.MEMORY_ONLY,
7
vertexStorageLevel: StorageLevel =
8
StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
9
{
10
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
11
val graph = GraphImpl(edges, defaultValue,
12
edgeStorageLevel, vertexStorageLevel)
13
uniqueEdges match {
14
case Some(p) => graph.partitionBy(p)
15
.groupEdges((a, b) => a + b)
16
case None => graph
17
}
18
}
19
​
Copied!
You notice both Graph creation methods are through apply method of class GraphImpl.

Example

Assume the data file looks like below:
1
cat followers.txt
2
3 6
3
3 5
4
3 4
5
3 2
6
2 1
7
4 1
8
1 2
9
6 3
10
7 3
11
7 6
12
6 7
13
3 7
Copied!
Build a graph using edgeListFile method of GraphLoader Class
1
import org.apache.spark.graphx.GraphLoader
2
val followerGraph = GraphLoader
3
.edgeListFile(sc, "followers.txt")
4
followerGraph.triplets.foreach(println)
5
/*
6
((3,1),(7,1),1)
7
((6,1),(3,1),1)
8
((6,1),(7,1),1)
9
((7,1),(3,1),1)
10
((7,1),(6,1),1)
11
((1,1),(2,1),1)
12
((2,1),(1,1),1)
13
((3,1),(2,1),1)
14
((3,1),(4,1),1)
15
((3,1),(5,1),1)
16
((3,1),(6,1),1)
17
((4,1),(1,1),1)
18
​
19
*/
20
​
21
followerGraph.vertices.foreach(println)
22
/*
23
(1,1)
24
(3,1)
25
(4,1)
26
(6,1)
27
(2,1)
28
(7,1)
29
(5,1)
30
*/
31
​
32
followerGraph.edges.foreach(println)
33
/*
34
Edge(1,2,1)
35
Edge(2,1,1)
36
Edge(3,2,1)
37
Edge(3,4,1)
38
Edge(3,5,1)
39
Edge(3,6,1)
40
Edge(4,1,1)
41
Edge(3,7,1)
42
Edge(6,3,1)
43
Edge(6,7,1)
44
Edge(7,3,1)
45
Edge(7,6,1)
46
*/
47
​
48
//Notice both vertex attribute and
49
//edge attribute are set to 1
50
//by edgeListFile method
Copied!
If want to know why vertex attribute and edge attribute are set to 1, here is the open source of method edgeListFile
1
def edgeListFile(
2
sc: SparkContext,
3
path: String,
4
canonicalOrientation: Boolean = false,
5
numEdgePartitions: Int = -1,
6
edgeStorageLevel: StorageLevel =
7
StorageLevel.MEMORY_ONLY,
8
vertexStorageLevel: StorageLevel =
9
StorageLevel.MEMORY_ONLY)
10
: Graph[Int, Int] =
11
{
12
val startTimeNs = System.nanoTime()
13
​
14
// Parse the edge data table directly into
15
// edge partitions
16
val lines =
17
if (numEdgePartitions > 0) {
18
sc.textFile(path, numEdgePartitions)
19
.coalesce(numEdgePartitions)
20
} else {
21
sc.textFile(path)
22
}
23
val edges = lines.mapPartitionsWithIndex {
24
(pid, iter) =>
25
val builder = new EdgePartitionBuilder[Int, Int]
26
iter.foreach { line =>
27
if (!line.isEmpty && line(0) != '#') {
28
val lineArray = line.split("\\s+")
29
if (lineArray.length < 2) {
30
throw new IllegalArgumentException("Invalid line: " + line)
31
}
32
val srcId = lineArray(0).toLong
33
val dstId = lineArray(1).toLong
34
if (canonicalOrientation && srcId > dstId) {
35
builder.add(dstId, srcId, 1)
36
} else {
37
builder.add(srcId, dstId, 1)
38
}
39
}
40
}
41
Iterator((pid, builder.toEdgePartition))
42
}.persist(edgeStorageLevel)
43
.setName("GraphLoader.edgeListFile - edges (%s)".format(path))
44
edges.count()
45
​
46
logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" +
47
" to load the edges")
48
​
49
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
50
vertexStorageLevel = vertexStorageLevel)
51
} // end of edgeListFile
52
​
53
​
Copied!
Graph computing is about converting one graph by manipulating attributes of veritices and edges to make a new graph
Use method fromEdgeTuples of Graph class to create a new with different vertex attributes, say 2 from 1 across board
1
val followerGraph2=Graph.fromEdgeTuples(followerGraph.triplets.map(x=>(x.srcId,x.dstId)),2)
2
followerGraph2.vertices.foreach(println)
3
​
4
/*
5
(1,2)
6
(3,2)
7
(7,2)
8
(5,2)
9
(4,2)
10
(6,2)
11
(2,2)
12
*/
Copied!
We can also change the edges attribute from 1 to 3, across the board for example, using fromEdges method of EdgeRDD class:
1
val edges = EdgeRDD.
2
fromEdges(followerGraph2.edges.toDF.rdd
3
.map(row=>
4
Edge(row.getAs[Long]("srcId"), row.getAs[Long]
5
("dstId"), 3)))
6
edges.foreach(println)
7
​
8
/*
9
Edge(1,2,3)
10
Edge(2,1,3)
11
Edge(3,2,3)
12
Edge(3,4,3)
13
Edge(3,5,3)
14
Edge(3,6,3)
15
Edge(4,1,3)
16
Edge(3,7,3)
17
Edge(6,3,3)
18
Edge(6,7,3)
19
Edge(7,3,3)
20
Edge(7,6,3)
21
*/
Copied!
Last modified 1yr ago