Spark Graph Computing

Introduction:

Spark GraphX ​​is a distributed graph processing framework. It is based on the Spark platform and provides a simple, easy-to-use and rich interface for graph computing and graph mining, which greatly facilitates the demand for distributed graph processing.

It is well known that there are many relationship chains between people or entities in social networks, most notablly in well known social network companies that I do not name. The network structure relationships in the data require graph computations.

GraphX ​​is a new Spark API, which is used for the calculation of graphs and distributed graphs (graph-parallellism). GraphX ​​extends Spark RDD by introducing a Resilient Distributed Property Graph, which is a directed multiple graph with vertices and edges that have attributes.

To support graph calculations, GraphX ​​has developed a basic set of functional operations and an optimized Pregel API. In addition, GraphX ​​also includes a fast-growing collection of graph algorithms and graph builders to simplify graph analysis tasks.

High Level Abstract Diagram of Apach Spark

The abstraction of GraphX is an elastic distributed attribute graph, which is a directed multigraph with user-defined objects connected to each vertex and edge. Multiple parallel edges in a directed multigraph share the same source and destination vertices. The ability to support parallel edges simplifies the modeling scenario, and there may be multiple relationships (such as co-worker and friend) for the same vertex. Each vertex uses a unique 64-bit identifier (VertexID) as the key. Similarly, edges have corresponding source and destination vertex identifiers.

Data-Parallel vs Graph-Parallel computing

Data-parallel computation focuses on records, while graph-parallel focuses on vertices. Data-parallel processes independent data simultaneously, while graph-parallel does parallelism that by partitioning (i.e., slicing) the graph data. More precisely, data-parallel recursively defines the transformation functions on neighbor features, and concurrently in parallel executes these transformation functions.

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

Abstraction of Graphx:

Vertex RDD[(VertexId, VD)]

Edge RDD[Edge(ED)]

Triplet RDD[EdgeTriplet[VD,ED,VD]]

Graph Graph[VD, ED]

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:

Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.

Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:

The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexId, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations.

Key Abstraction of Graphx:

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

Source code of VertexRDD and EdgeRDD:

Additionally:

All operations on the Graph will be converted to RDD operations of its associated Table view to complete. You can say a graph computation is really a series of RDD conversion processes. Therefore, Graph has what comes with the RDD: Immutable, Distributed, and Fault-Tolerant. This simply means converting from one graph to a new graph is really creating a new graph, because, like RDD is immutable, so is Graph.

Graph Computing:

Example Property Graph

Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:

Resulting Graph should have following signature:

usergraph[(String,String),String]

VD=(String,String)

ED=String

Step 1: Import necessary packages

Step 2: Unless you use spark-shell or Jupyter-notebook Spylon Kernel, you are likely needing to create Spark context.

There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:

Step 3: Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.

We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

and

Please note:

graph.vertices returns a VertexRDD[(String,String)]

graph.edges returns an EdgeRDD[String]

Step 4: In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:

Graphically,

The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.

Triplet component of a Graph describes the relationships amongst vertices and edges.

The greatness of triplet is one of the most attractive features coming from Spark's Graphx. It is automatically created for you anytime a new vertices and/or new edges in the Graph is created. You notice I used word create not modify, because vertices, edges, triplets even Graph is immutable. "Modifying" any one of them means creating new ones all together.

Use SQL to infer from the triplet query shown earlier:

Note: vertices and edges in database tables are mutable by insert, update and delete. RDDs such as vertices and edges are immutable, "modifying" a RDD means creating a new RDD, original RDD is not changed.

graph.triplets returns RDD EdgeTriplet[(String, String),String]

Graph distributions and storage

Graph storage in general

If the graph is too large to be computed on a single computer, it needs to be separated or cut into pieces and store each piece on each computer

For example, a social media company can have millions of people, hundreds millions of "friends" amongst people, then you have a graph with millions of vertices and hundreds of millions of edges. This means, the graph has to be split into lots of partitions runs on lots of worker nodes (computers).

Graph edge cut vs vertex cut

Two way to divide a graph into multiple portions of it.

As name suggests, Edge Cut means cutting the graph by edge.

Each vertex is stored once, but some edges will be interrupted and divided into two machines. The advantage of this is to save storage space; the disadvantage is that when performing edge-based calculations on the graph, for an edge where two vertices are divided into different machines, data needs to be transmitted across machines, and large network traffic would be expected.

Also as name suggests, Vertex Cut means cutting the graph on the vertex.

Each edge is stored only once and will only appear on one machine. Points with many neighbors will be replicated to multiple machines, increasing storage overhead and causing data synchronization issues. The advantage is that it can significantly reduce network traffic when comparing Edge Cut.

Vertex Cut only approach

Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with the Graph.partitionBy operator. The default partitioning strategy is to use the initial partitioning of the edges as provided on graph construction. However, users can easily switch to 2D-partitioning or other heuristics included in GraphX.

Graphx physical storage design

Graphx uses three RDDs to store graph data information across machines over the network:

VertexTable (id, data): id is the vertex id and data is the vertex attribute

EdgeTable (pid, src, dst, data): pid is the partition id, src is the source vertex id, dst is the destination vertex id, and data is the edge attribute

RoutingTable (id, pid): id is the vertex id and pid is the partition id

Partition Strategy

Graph's distributed storage uses the partitionBy method, and the user specifies different partition strategies (PartitionStrategy) and number of partitions. The partitioning strategy assigns edges to each EdgePartition, vertices to each VertexPartition. There are currently 4 strategies

Based upon open source code:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala

EdgePartition1d

Assigns edges to partitions using only the source vertex ID, colocating edges with the same source.

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition1D

EdgePartition2d

This cut method uses both source vertex id and destination vertex id. It uses 2-dimensional sparse edge matrices to assign edges to different partitions, ensuring that the number of vertices backed up is limited at square root of number of partitions

Here numParts represents the number of partitions. The implementation of this method is divided into two cases, that is, the number of partitions can be fully squared and not fully squared. When the number of partitions can be fully squared, use the following method:

When the number of partitions cannot be fully square rooted:

For illustration:

Suppose a graph with 12 vertices to partition over 9 machines. Use the following sparse matrix representation:

The edge denoted by 'E' connects 'v11' with 'v1' and is assigned to processor (partition) P6. To get the processor number we divide the matrix into 'sqrt(numParts)' by 'sqrt(numParts)' blocks. Notice that edges adjacent to 'v11' can only be in the first column of blocks '(P0, P3, P6)' or the last row of blocks '(P6, P7, P8)'. As a consequence we can guarantee that 'v11' will need to be replicated to at most '2 * sqrt(numParts)' machines.

Notice that 'P0' has many edges and as a consequence this partitioning would lead to poor work balance. To improve balance we first multiply each vertex id by a large prime to shuffle the vertex locations.

When the number of partitions requested is not a perfect square we use a slightly different method where the last column can have a different number of rows than the others while still maintaining the same size per block.

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition2

RandomVertexCut:

This partitioning strategy is simple: Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is RandomVertexCut

CanonicalRandomVertexCut.

Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.

Demonstration:

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is CononicalRandomVertexCut

Graph Creation

Graph creation methods

fromEdges

Build graph from edges

fromEdgeTuples

Build graph from connected vertices

You notice both Graph creation methods are through apply method of class GraphImpl.

Example

Assume the data file looks like below:

Build a graph using edgeListFile method of GraphLoader Class

If want to know why vertex attribute and edge attribute are set to 1, here is the open source of method edgeListFile

Graph computing is about converting one graph by manipulating attributes of veritices and edges to make a new graph

Use method fromEdgeTuples of Graph class to create a new with different vertex attributes, say 2 from 1 across board

We can also change the edges attribute from 1 to 3, across the board for example, using fromEdges method of EdgeRDD class:

Last updated

Was this helpful?