Spark GraphX ​​is a distributed graph processing framework. It is based on the Spark platform and provides a simple, easy-to-use and rich interface for graph computing and graph mining, which greatly facilitates the demand for distributed graph processing.
It is well known that there are many relationship chains between people or entities in social networks, most notablly in well known social network companies that I do not name. The network structure relationships in the data require graph computations.
GraphX ​​is a new Spark API, which is used for the calculation of graphs and distributed graphs (graph-parallellism). GraphX ​​extends Spark RDD by introducing a Resilient Distributed Property Graph, which is a directed multiple graph with vertices and edges that have attributes.
To support graph calculations, GraphX ​​has developed a basic set of functional operations and an optimized Pregel API. In addition, GraphX ​​also includes a fast-growing collection of graph algorithms and graph builders to simplify graph analysis tasks.
High Level Abstract Diagram of Apach Spark
The abstraction of GraphX is an elastic distributed attribute graph, which is a directed multigraph with user-defined objects connected to each vertex and edge. Multiple parallel edges in a directed multigraph share the same source and destination vertices. The ability to support parallel edges simplifies the modeling scenario, and there may be multiple relationships (such as co-worker and friend) for the same vertex. Each vertex uses a unique 64-bit identifier (VertexID) as the key. Similarly, edges have corresponding source and destination vertex identifiers.
Data-Parallel vs Graph-Parallel computing
Data-parallel computation focuses on records, while graph-parallel focuses on vertices. Data-parallel processes independent data simultaneously, while graph-parallel does parallelism that by partitioning (i.e., slicing) the graph data. More precisely, data-parallel recursively defines the transformation functions on neighbor features, and concurrently in parallel executes these transformation functions.
GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.
Abstraction of Graphx:
Vertex RDD[(VertexId, VD)]
Edge RDD[Edge(ED)]
Triplet RDD[EdgeTriplet[VD,ED,VD]]
Graph Graph[VD, ED]
Table View is in the form of VertexRDD and EdgeRDD
Graph View is in the form of Graph
In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:
import org.apache.spark.graphx.Graph
class VertexProperty()
case class UserProperty(val name: String)
extends VertexProperty
case class
ProductProperty(val name: String, val price: Double)
extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:
import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.graphx.impl._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.{VertexRDD,EdgeRDD}
abstract class Graph[VD: ClassTag, ED: ClassTag] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexId, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations.
All operations on the Graph will be converted to RDD operations of its associated Table view to complete. You can say a graph computation is really a series of RDD conversion processes. Therefore, Graph has what comes with the RDD: Immutable, Distributed, and Fault-Tolerant. This simply means converting from one graph to a new graph is really creating a new graph, because, like RDD is immutable, so is Graph.
Graph Computing:
Example Property Graph
Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:
Step 2: Unless you use spark-shell or Jupyter-notebook Spylon Kernel, you are likely needing to create Spark context.
val spark = SparkSession
.builder
.appName("graphx app")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///tmp")
.getOrCreate()
import spark.implicits._
//and SparkContext sc
val sc = spark.sparkContext
There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:
// Assume the SparkContext has already been constructed
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
Step 3: Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.
We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.
// Count all users which are postdocs
val verticeCount=graph.vertices
.filter { case (id, (name, pos)) => pos == "postdoc" }
.count
println(verticeCount)
and
// Count all the edges where src > dst
val edgeCount=graph.edges
.filter(e => e.srcId > e.dstId).count
println(edgeCount)
Please note:
graph.vertices returns a VertexRDD[(String,String)]
graph.vertices
/*
res3: org.apache.spark.graphx.VertexRDD[(String, String)] = VertexRDDImpl[11] at RDD at VertexRDD.scala:57
*/
graph.edges returns an EdgeRDD[String]
graph.edges
/*
res4: org.apache.spark.graphx.EdgeRDD[String] = EdgeRDDImpl[13] at RDD at EdgeRDD.scala:41
*/
Step 4: In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:
-- do it by SQL on PostgreSQL
-- Since attr field of vertices in this example
-- is (String,String)
-- Define complex type graphx_vertice_type1
CREATE TYPE graphx_vertice_type1 AS (
name text,
title text
);
--create table vertices, notice column attr has
--type graphx_vertice_type1
--created earlier
CREATE TABLE public.vertices
(
id bigint,
attr graphx_vertice_type1
)
TABLESPACE spark_tbs;
-- show table vertices created
dv6=# \d public.vertices
Table "public.vertices"
Column | Type | Modifiers
--------+----------------------+-----------
id | bigint |
attr | graphx_vertice_type1 |
Tablespace: "spark_tbs"
-- Now populate vertices tables with vertex
-- information
dv6=# insert into vertices values
(3, ROW('rxin','student')),
(7, ROW('jgonzal', 'postdoc')),
(5, ROW('franklin','prof')),
(2, ROW('istoica','prof'));
-- list vertices table rows
dv6=# select * from vertices;
id | attr
----+-------------------
3 | (rxin,student)
7 | (jgonzal,postdoc)
5 | (franklin,prof)
2 | (istoica,prof)
(4 rows)
-- create edges table
CREATE TABLE public.edges
(
srcId bigint,
dstId bigint,
attr text
)
TABLESPACE spark_tbs;
-- insert edge information into table edges
insert into edges values
(3,7,'collab'),
(5,3,'advisor'),
(2,5,'colleague'),
(5,7,'pi');
-- show edges table rows
dv6=# select * from edges;
srcid | dstid | attr
-------+-------+-----------
3 | 7 | collab
5 | 3 | advisor
2 | 5 | colleague
5 | 7 | pi
(4 rows)
-- Now construct SQL query for triplet
SELECT src.id src_id, src.attr src_attr,
e.attr,dst.id dst_id , dst.attr dst_attr
FROM edges AS e LEFT JOIN vertices AS src
ON e.srcId = src.Id
LEFT JOIN vertices AS dst
ON e.dstId = dst.Id;
src_id | src_attr | attr | dst_id | dst_attr
--------+-----------------+-----------+--------+-------------------
5 | (franklin,prof) | advisor | 3 | (rxin,student)
2 | (istoica,prof) | colleague | 5 | (franklin,prof)
3 | (rxin,student) | collab | 7 | (jgonzal,postdoc)
5 | (franklin,prof) | pi | 7 | (jgonzal,postdoc)
(4 rows)
Graphically,
The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.
graph.triplets.map(triplet =>
triplet.srcAttr._1 +
" is the " + triplet.attr + " of " +
triplet.dstAttr._1).foreach(println)
/*
Output:
rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the pi of jgonzal
*/
Triplet component of a Graph describes the relationships amongst vertices and edges.
The greatness of triplet is one of the most attractive features coming from Spark's Graphx. It is automatically created for you anytime a new vertices and/or new edges in the Graph is created. You notice I used word create not modify, because vertices, edges, triplets even Graph is immutable. "Modifying" any one of them means creating new ones all together.
Use SQL to infer from the triplet query shown earlier:
Note: vertices and edges in database tables are mutable by insert, update and delete. RDDs such as vertices and edges are immutable, "modifying" a RDD means creating a new RDD, original RDD is not changed.
with x as (SELECT src.id src_id, src.attr src_attr,
e.attr,dst.id dst_id , dst.attr dst_attr
FROM edges AS e LEFT JOIN vertices AS src
ON e.srcId = src.Id
LEFT JOIN vertices AS dst
ON e.dstId = dst.Id)
select (src_attr).name || ' is ' || attr || ' of ' || (dst_attr).name inference_from_triplet
from x;
inference_from_triplet
----------------------------------
franklin is advisor of rxin
istoica is colleague of franklin
rxin is collab of jgonzal
franklin is pi of jgonzal
(4 rows)
graph.triplets
/*
Output:
res8: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[26] at mapPartitions at GraphImpl.scala:48
*/
Graph distributions and storage
Graph storage in general
If the graph is too large to be computed on a single computer, it needs to be separated or cut into pieces and store each piece on each computer
For example, a social media company can have millions of people, hundreds millions of "friends" amongst people, then you have a graph with millions of vertices and hundreds of millions of edges. This means, the graph has to be split into lots of partitions runs on lots of worker nodes (computers).
Graph edge cut vs vertex cut
Two way to divide a graph into multiple portions of it.
As name suggests, Edge Cut means cutting the graph by edge.
Each vertex is stored once, but some edges will be interrupted and divided into two machines. The advantage of this is to save storage space; the disadvantage is that when performing edge-based calculations on the graph, for an edge where two vertices are divided into different machines, data needs to be transmitted across machines, and large network traffic would be expected.
Also as name suggests, Vertex Cut means cutting the graph on the vertex.
Each edge is stored only once and will only appear on one machine. Points with many neighbors will be replicated to multiple machines, increasing storage overhead and causing data synchronization issues. The advantage is that it can significantly reduce network traffic when comparing Edge Cut.
Vertex Cut only approach
Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with the Graph.partitionBy operator. The default partitioning strategy is to use the initial partitioning of the edges as provided on graph construction. However, users can easily switch to 2D-partitioning or other heuristics included in GraphX.
Graphx physical storage design
Graphx uses three RDDs to store graph data information across machines over the network:
VertexTable (id, data): id is the vertex id and data is the vertex attribute
EdgeTable (pid, src, dst, data): pid is the partition id, src is the source vertex id, dst is the destination vertex id, and data is the edge attribute
RoutingTable (id, pid): id is the vertex id and pid is the partition id
Partition Strategy
Graph's distributed storage uses the partitionBy method, and the user specifies different partition strategies (PartitionStrategy) and number of partitions. The partitioning strategy assigns edges to each EdgePartition, vertices to each VertexPartition. There are currently 4 strategies
Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition1D
import org.apache.spark.graphx
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util._
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc,
numVertices = 12)
.mapVertices( (id, _) => id.toDouble )
.partitionBy(PartitionStrategy
.EdgePartition1D,4)
graph.edges.count
/*
res282: Long = 74
*/
//each line is a partiton, you can see Edge() inside
graph.edges.mapPartitions{case (rows) =>
{Iterator((rows.mkString))}}
.collect.foreach(println)
/*
Edge(0,0,1)Edge(0,1,1)Edge(0,3,1)Edge(0,7,1)Edge(0,10,1)Edge(4,3,1)Edge(4,7,1)Edge(4,7,1)Edge(8,1,1)Edge(8,5,1)Edge(8,7,1)Edge(8,10,1)Edge(8,11,1)Edge(8,11,1)
Edge(1,1,1)Edge(1,1,1)Edge(1,3,1)Edge(1,5,1)Edge(1,7,1)Edge(1,10,1)Edge(5,0,1)Edge(5,1,1)Edge(5,3,1)Edge(5,4,1)Edge(5,5,1)Edge(5,5,1)Edge(9,2,1)Edge(9,3,1)Edge(9,3,1)Edge(9,3,1)Edge(9,8,1)Edge(9,9,1)Edge(9,10,1)Edge(9,11,1)
Edge(2,0,1)Edge(2,2,1)Edge(2,3,1)Edge(2,4,1)Edge(2,6,1)Edge(2,7,1)Edge(2,8,1)Edge(2,8,1)Edge(2,11,1)Edge(2,11,1)Edge(2,11,1)Edge(6,6,1)Edge(6,8,1)Edge(6,11,1)Edge(10,0,1)Edge(10,2,1)Edge(10,2,1)Edge(10,8,1)Edge(10,9,1)Edge(10,9,1)Edge(10,10,1)Edge(10,11,1)
Edge(3,2,1)Edge(3,2,1)Edge(3,3,1)Edge(3,6,1)Edge(3,7,1)Edge(3,9,1)Edge(3,9,1)Edge(3,11,1)Edge(7,0,1)Edge(7,0,1)Edge(7,5,1)Edge(7,7,1)Edge(11,0,1)Edge(11,0,1)Edge(11,7,1)Edge(11,7,1)Edge(11,7,1)Edge(11,8,1)
It appears not really balanced Edges in partition
*/
EdgePartition2d
This cut method uses both source vertex id and destination vertex id. It uses 2-dimensional sparse edge matrices to assign edges to different partitions, ensuring that the number of vertices backed up is limited at square root of number of partitions
case object EdgePartition2D extends
PartitionStrategy {
override def getPartition(src: VertexId,
dst: VertexId, numParts: PartitionID):
PartitionID = {
val ceilSqrtNumParts: PartitionID =
math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexId = 1125899906842597L
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
// Use old method for perfect squared to ensure
// we get same results
val col: PartitionID =
(math.abs(src * mixingPrime) % ceilSqrtNumParts)
.toInt
val row: PartitionID =
(math.abs(dst * mixingPrime) % ceilSqrtNumParts)
.toInt
(col * ceilSqrtNumParts + row) % numParts
} else
{
// Otherwise use new method
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col =
(math.abs(src * mixingPrime) % numParts / rows)
.toInt
val row =
(math.abs(dst * mixingPrime) %
(if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row
}
}
}
Here numParts represents the number of partitions. The implementation of this method is divided into two cases, that is, the number of partitions can be fully squared and not fully squared. When the number of partitions can be fully squared, use the following method:
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts)
{
// Use old method for perfect squared to ensure we get same results
val col: PartitionID =
(math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID =
(math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
When the number of partitions cannot be fully square rooted:
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col =
(math.abs(src * mixingPrime) % numParts / rows)
.toInt
val row =
(math.abs(dst * mixingPrime)
% (if (col < cols - 1) rows else lastColRows))
.toInt
col * rows + row
For illustration:
Suppose a graph with 12 vertices to partition over 9 machines. Use the following sparse matrix representation:
The edge denoted by 'E' connects 'v11' with 'v1' and is assigned to processor (partition) P6. To get the processor number we divide the matrix into 'sqrt(numParts)' by 'sqrt(numParts)' blocks. Notice that edges adjacent to 'v11' can only be in the first column of blocks '(P0, P3, P6)' or the last row of blocks '(P6, P7, P8)'. As a consequence we can guarantee that 'v11' will need to be replicated to at most '2 * sqrt(numParts)' machines.
Notice that 'P0' has many edges and as a consequence this partitioning would lead to poor work balance. To improve balance we first multiply each vertex id by a large prime to shuffle the vertex locations.
When the number of partitions requested is not a perfect square we use a slightly different method where the last column can have a different number of rows than the others while still maintaining the same size per block.
Demonstration:
Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition2
This partitioning strategy is simple: Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices
Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is RandomVertexCut
Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.
Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is CononicalRandomVertexCut