# Spark Graph Computing

Spark GraphX â€‹â€‹is a distributed graph processing framework. It is based on the Spark platform and provides a simple, easy-to-use and rich interface for graph computing and graph mining, which greatly facilitates the demand for distributed graph processing.

It is well known that there are many relationship chains between people or entities in social networks, most notablly in well known social network companies that I do not name. The network structure relationships in the data require graph computations.

GraphX â€‹â€‹is a new Spark API, which is used for the calculation of graphs and distributed graphs (graph-parallellism). GraphX â€‹â€‹extends Spark RDD by introducing a Resilient Distributed Property Graph, which is a directed multiple graph with vertices and edges that have attributes.

To support graph calculations, GraphX â€‹â€‹has developed a basic set of functional operations and an optimized Pregel API. In addition, GraphX â€‹â€‹also includes a fast-growing collection of graph algorithms and graph builders to simplify graph analysis tasks.

The abstraction of GraphX is an elastic distributed attribute graph, which is a directed multigraph with user-defined objects connected to each vertex and edge. Multiple parallel edges in a directed multigraph share the same source and destination vertices. The ability to support parallel edges simplifies the modeling scenario, and there may be multiple relationships (such as co-worker and friend) for the same vertex. Each vertex uses a unique 64-bit identifier (VertexID) as the key. Similarly, edges have corresponding source and destination vertex identifiers.

Data-parallel computation focuses on records, while graph-parallel focuses on vertices. Data-parallel processes independent data simultaneously, while graph-parallel does parallelism that by partitioning (i.e., slicing) the graph data. More precisely, data-parallel recursively defines the transformation functions on neighbor features, and concurrently in parallel executes these transformation functions.

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

Vertex RDD[(VertexId, VD)]

Edge RDD[Edge(ED)]

Triplet RDD[EdgeTriplet[VD,ED,VD]]

Graph Graph[VD, ED]

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:

import org.apache.spark.graphx.Graph

class VertexProperty()

case class UserProperty(val name: String)

extends VertexProperty

case class

ProductProperty(val name: String, val price: Double)

extends VertexProperty

// The graph might then have the type:

var graph: Graph[VertexProperty, String] = null

Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.

Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:

import scala.reflect.ClassTag

â€‹

import org.apache.spark._

import org.apache.spark.graphx.impl._

import org.apache.spark.rdd._

import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.{VertexRDD,EdgeRDD}

â€‹

abstract class Graph[VD: ClassTag, ED: ClassTag] {

val vertices: VertexRDD[VD]

val edges: EdgeRDD[ED]

}

The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexId, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations.

Key Abstraction of Graphx:

Vertex RDD[(VertexId, VD)]

Optimized VertexRDD[VD]

Edge RDD[Edge(ED)]

Optimzied EdgeRDD[ED]

Triplet RDD[EdgeTriplet[VD,ED,VD]]

Graph Graph[VD, ED]

Table View is in the form of VertexRDD and EdgeRDD

Graph View is in the form of Graph

Source code of VertexRDD and EdgeRDD:

import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContext, SparkException}

import org.apache.spark.rdd.RDD

â€‹

abstract class VertexRDD[VD]

(sc: SparkContext, deps: Seq[Dependency[_]])

extends RDD[(org.apache.spark.graphx.VertexId, VD)](sc, deps)

â€‹

abstract class EdgeRDD[ED]

(sc: SparkContext, deps: Seq[Dependency[_]])

extends RDD[org.apache.spark.graphx.Edge[ED]](sc, deps)

Additionally:

All operations on the Graph will be converted to RDD operations of its associated Table view to complete. You can say a graph computation is really a series of RDD conversion processes. Therefore, Graph has what comes with the RDD: Immutable, Distributed, and Fault-Tolerant. This simply means converting from one graph to a new graph is really creating a new graph, because, like RDD is immutable, so is Graph.

Example Property Graph

Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:

Resulting Graph should have following signature:

usergraph[(String,String),String]

VD=(String,String)

ED=String

Step 1: Import necessary packages

import org.apache.spark._

import org.apache.spark.graphx._

import org.apache.spark.rdd.RDD

import org.apache.spark.graphx

.{Edge, Graph, VertexId}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

Step 2: Unless you use spark-shell or Jupyter-notebook Spylon Kernel, you are likely needing to create Spark context.

val spark = SparkSession

.builder

.appName("graphx app")

.master("local[*]")

.config("spark.sql.warehouse.dir", "file:///tmp")

.getOrCreate()

â€‹

import spark.implicits._

â€‹

//and SparkContext sc

val sc = spark.sparkContext

There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:

// Assume the SparkContext has already been constructed

// Create an RDD for the vertices

val users: RDD[(VertexId, (String, String))] =

sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),

(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// Create an RDD for edges

val relationships: RDD[Edge[String]] =

sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),

Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// Define a default user in case there are relationship with missing user

val defaultUser = ("John Doe", "Missing")

// Build the initial Graph

val graph = Graph(users, relationships, defaultUser)

Step 3: Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.

We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

// Count all users which are postdocs

val verticeCount=graph.vertices

.filter { case (id, (name, pos)) => pos == "postdoc" }

.count

println(verticeCount)

â€‹

and

// Count all the edges where src > dst

val edgeCount=graph.edges

.filter(e => e.srcId > e.dstId).count

println(edgeCount)

Please note:

graph.vertices returns a VertexRDD[(String,String)]

graph.vertices

/*

res3: org.apache.spark.graphx.VertexRDD[(String, String)] = VertexRDDImpl[11] at RDD at VertexRDD.scala:57

*/

graph.edges returns an EdgeRDD[String]

graph.edges

/*

res4: org.apache.spark.graphx.EdgeRDD[String] = EdgeRDDImpl[13] at RDD at EdgeRDD.scala:41

*/

Step 4: In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:

-- do it by SQL on PostgreSQL

â€‹

-- Since attr field of vertices in this example

-- is (String,String)

-- Define complex type graphx_vertice_type1

â€‹

CREATE TYPE graphx_vertice_type1 AS (

name text,

title text

);

â€‹

--create table vertices, notice column attr has

--type graphx_vertice_type1

--created earlier

â€‹

CREATE TABLE public.vertices

(

id bigint,

attr graphx_vertice_type1

)

TABLESPACE spark_tbs;

â€‹

-- show table vertices created

â€‹

dv6=# \d public.vertices

â€‹

Table "public.vertices"

Column | Type | Modifiers

--------+----------------------+-----------

id | bigint |

attr | graphx_vertice_type1 |

â€‹

Tablespace: "spark_tbs"

â€‹

-- Now populate vertices tables with vertex

-- information

â€‹

dv6=# insert into vertices values

(3, ROW('rxin','student')),

(7, ROW('jgonzal', 'postdoc')),

(5, ROW('franklin','prof')),

(2, ROW('istoica','prof'));

â€‹

-- list vertices table rows

â€‹

dv6=# select * from vertices;

id | attr

----+-------------------

3 | (rxin,student)

7 | (jgonzal,postdoc)

5 | (franklin,prof)

2 | (istoica,prof)

(4 rows)

â€‹

-- create edges table

â€‹

CREATE TABLE public.edges

(

srcId bigint,

dstId bigint,

attr text

)

TABLESPACE spark_tbs;

â€‹

-- insert edge information into table edges

â€‹

insert into edges values

(3,7,'collab'),

(5,3,'advisor'),

(2,5,'colleague'),

(5,7,'pi');

â€‹

-- show edges table rows

â€‹

dv6=# select * from edges;

srcid | dstid | attr

-------+-------+-----------

3 | 7 | collab

5 | 3 | advisor

2 | 5 | colleague

5 | 7 | pi

(4 rows)

â€‹

-- Now construct SQL query for triplet

â€‹

SELECT src.id src_id, src.attr src_attr,

e.attr,dst.id dst_id , dst.attr dst_attr

FROM edges AS e LEFT JOIN vertices AS src

ON e.srcId = src.Id

LEFT JOIN vertices AS dst

ON e.dstId = dst.Id;

â€‹

src_id | src_attr | attr | dst_id | dst_attr

--------+-----------------+-----------+--------+-------------------

5 | (franklin,prof) | advisor | 3 | (rxin,student)

2 | (istoica,prof) | colleague | 5 | (franklin,prof)

3 | (rxin,student) | collab | 7 | (jgonzal,postdoc)

5 | (franklin,prof) | pi | 7 | (jgonzal,postdoc)

(4 rows)

â€‹

â€‹

â€‹

Graphically,

The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.

graph.triplets.map(triplet =>

triplet.srcAttr._1 +

" is the " + triplet.attr + " of " +

triplet.dstAttr._1).foreach(println)

/*

Output:

rxin is the collab of jgonzal

franklin is the advisor of rxin

istoica is the colleague of franklin

franklin is the pi of jgonzal

*/

Triplet component of a Graph describes the relationships amongst vertices and edges.

*The greatness of triplet is one of the most attractive features coming from Spark's Graphx. It is automatically created for you anytime a new vertices and/or new edges in the Graph is created. You notice I used word create not modify, because vertices, edges, triplets even Graph is immutable. "Modifying" any one of them means creating new ones all together.*

Use SQL to infer from the triplet query shown earlier:

*Note: vertices and edges in database tables are mutable by insert, update and delete. RDDs such as vertices and edges are immutable, "modifying" a RDD means creating a new RDD, original RDD is not changed.*

with x as (SELECT src.id src_id, src.attr src_attr,

e.attr,dst.id dst_id , dst.attr dst_attr

FROM edges AS e LEFT JOIN vertices AS src

ON e.srcId = src.Id

LEFT JOIN vertices AS dst

ON e.dstId = dst.Id)

select (src_attr).name || ' is ' || attr || ' of ' || (dst_attr).name inference_from_triplet

from x;

â€‹

inference_from_triplet

----------------------------------

franklin is advisor of rxin

istoica is colleague of franklin

rxin is collab of jgonzal

franklin is pi of jgonzal

(4 rows)

â€‹

graph.triplets returns RDD EdgeTriplet[(String, String),String]

graph.triplets

/*

Output:

res8: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[26] at mapPartitions at GraphImpl.scala:48

*/

If the graph is too large to be computed on a single computer, it needs to be separated or cut into pieces and store each piece on each computer

For example, a social media company can have millions of people, hundreds millions of "friends" amongst people, then you have a graph with millions of vertices and hundreds of millions of edges. This means, the graph has to be split into lots of partitions runs on lots of worker nodes (computers).

Two way to divide a graph into multiple portions of it.

Each vertex is stored once, but some edges will be interrupted and divided into two machines. The advantage of this is to save storage space; the disadvantage is that when performing edge-based calculations on the graph, for an edge where two vertices are divided into different machines, data needs to be transmitted across machines, and large network traffic would be expected.

Each edge is stored only once and will only appear on one machine. Points with many neighbors will be replicated to multiple machines, increasing storage overhead and causing data synchronization issues. The advantage is that it can significantly reduce network traffic when comparing Edge Cut.

Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with the Graph.partitionBy operator. The default partitioning strategy is to use the initial partitioning of the edges as provided on graph construction. However, users can easily switch to 2D-partitioning or other heuristics included in GraphX.

Graphx uses three RDDs to store graph data information across machines over the network:

VertexTable (id, data): id is the vertex id and data is the vertex attribute

EdgeTable (pid, src, dst, data): pid is the partition id, src is the source vertex id, dst is the destination vertex id, and data is the edge attribute

RoutingTable (id, pid): id is the vertex id and pid is the partition id

Graph's distributed storage uses the partitionBy method, and the user specifies different partition strategies (PartitionStrategy) and number of partitions. The partitioning strategy assigns edges to each EdgePartition, vertices to each VertexPartition. There are currently 4 strategies

Based upon open source code:

**EdgePartition1d**

Assigns edges to partitions using only the source vertex ID, colocating edges with the same source.

â€‹

case object EdgePartition1D extends

PartitionStrategy {

override def getPartition(src: VertexId,

dst: VertexId, numParts: PartitionID):

PartitionID = {

val mixingPrime: VertexId = 1125899906842597L

(math.abs(src * mixingPrime) % numParts).toInt

}

}

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition1D

import org.apache.spark.graphx

import org.apache.spark.graphx.impl._

import org.apache.spark.graphx.lib._

import org.apache.spark.graphx.util._

â€‹

val graph: Graph[Double, Int] =

GraphGenerators.logNormalGraph(sc,

numVertices = 12)

.mapVertices( (id, _) => id.toDouble )

.partitionBy(PartitionStrategy

.EdgePartition1D,4)

graph.edges.count

/*

res282: Long = 74

*/

â€‹

//each line is a partiton, you can see Edge() inside

graph.edges.mapPartitions{case (rows) =>

{Iterator((rows.mkString))}}

.collect.foreach(println)

/*

Edge(0,0,1)Edge(0,1,1)Edge(0,3,1)Edge(0,7,1)Edge(0,10,1)Edge(4,3,1)Edge(4,7,1)Edge(4,7,1)Edge(8,1,1)Edge(8,5,1)Edge(8,7,1)Edge(8,10,1)Edge(8,11,1)Edge(8,11,1)

Edge(1,1,1)Edge(1,1,1)Edge(1,3,1)Edge(1,5,1)Edge(1,7,1)Edge(1,10,1)Edge(5,0,1)Edge(5,1,1)Edge(5,3,1)Edge(5,4,1)Edge(5,5,1)Edge(5,5,1)Edge(9,2,1)Edge(9,3,1)Edge(9,3,1)Edge(9,3,1)Edge(9,8,1)Edge(9,9,1)Edge(9,10,1)Edge(9,11,1)

Edge(2,0,1)Edge(2,2,1)Edge(2,3,1)Edge(2,4,1)Edge(2,6,1)Edge(2,7,1)Edge(2,8,1)Edge(2,8,1)Edge(2,11,1)Edge(2,11,1)Edge(2,11,1)Edge(6,6,1)Edge(6,8,1)Edge(6,11,1)Edge(10,0,1)Edge(10,2,1)Edge(10,2,1)Edge(10,8,1)Edge(10,9,1)Edge(10,9,1)Edge(10,10,1)Edge(10,11,1)

Edge(3,2,1)Edge(3,2,1)Edge(3,3,1)Edge(3,6,1)Edge(3,7,1)Edge(3,9,1)Edge(3,9,1)Edge(3,11,1)Edge(7,0,1)Edge(7,0,1)Edge(7,5,1)Edge(7,7,1)Edge(11,0,1)Edge(11,0,1)Edge(11,7,1)Edge(11,7,1)Edge(11,7,1)Edge(11,8,1)

It appears not really balanced Edges in partition

*/

**EdgePartition2d**

This cut method uses both source vertex id and destination vertex id. It uses 2-dimensional sparse edge matrices to assign edges to different partitions, ensuring that the number of vertices backed up is limited at square root of number of partitions

case object EdgePartition2D extends

PartitionStrategy {

override def getPartition(src: VertexId,

dst: VertexId, numParts: PartitionID):

PartitionID = {

val ceilSqrtNumParts: PartitionID =

math.ceil(math.sqrt(numParts)).toInt

val mixingPrime: VertexId = 1125899906842597L

if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {

// Use old method for perfect squared to ensure

// we get same results

val col: PartitionID =

(math.abs(src * mixingPrime) % ceilSqrtNumParts)

.toInt

val row: PartitionID =

(math.abs(dst * mixingPrime) % ceilSqrtNumParts)

.toInt

(col * ceilSqrtNumParts + row) % numParts

} else

{

// Otherwise use new method

val cols = ceilSqrtNumParts

val rows = (numParts + cols - 1) / cols

val lastColRows = numParts - rows * (cols - 1)

val col =

(math.abs(src * mixingPrime) % numParts / rows)

.toInt

val row =

(math.abs(dst * mixingPrime) %

(if (col < cols - 1) rows else lastColRows)).toInt

col * rows + row

â€‹

}

}

}

Here numParts represents the number of partitions. The implementation of this method is divided into two cases, that is, the number of partitions can be fully squared and not fully squared. When the number of partitions can be fully squared, use the following method:

if (numParts == ceilSqrtNumParts * ceilSqrtNumParts)

{

// Use old method for perfect squared to ensure we get same results

val col: PartitionID =

(math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt

val row: PartitionID =

(math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt

(col * ceilSqrtNumParts + row) % numParts

When the number of partitions cannot be fully square rooted:

val cols = ceilSqrtNumParts

val rows = (numParts + cols - 1) / cols

val lastColRows = numParts - rows * (cols - 1)

val col =

(math.abs(src * mixingPrime) % numParts / rows)

.toInt

val row =

(math.abs(dst * mixingPrime)

% (if (col < cols - 1) rows else lastColRows))

.toInt

col * rows + row

**For illustration:**

Suppose a graph with 12 vertices to partition over 9 machines. Use the following sparse matrix representation:

__________________________________

v0 | P0 * | P1 | P2 |

v1 | **** | * | |

v2 | ******* | * | **** |

v3 | ***** | * * | |

----------------------------------

v4 | P3 * | P4 *** | P5 ** * |

v5 | * * | * | |

v6 | | * | **** |

v7 | * * * | * * | |

----------------------------------

v8 | P6 | P7 | P8 * |

v9 | | * | |

v10 | | * | * * |

v11 | * <-E | *** | * |

----------------------------------

The edge denoted by 'E' connects 'v11' with 'v1' and is assigned to processor (partition) P6. To get the processor number we divide the matrix into 'sqrt(numParts)' by 'sqrt(numParts)' blocks. Notice that edges adjacent to 'v11' can only be in the first column of blocks '(P0, P3, P6)' or the last row of blocks '(P6, P7, P8)'. As a consequence we can guarantee that 'v11' will need to be replicated to at most '2 * sqrt(numParts)' machines.

Notice that 'P0' has many edges and as a consequence this partitioning would lead to poor work balance. To improve balance we first multiply each vertex id by a large prime to shuffle the vertex locations.

When the number of partitions requested is not a perfect square we use a slightly different method where the last column can have a different number of rows than the others while still maintaining the same size per block.

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is EdgePartition2

import org.apache.spark.graphx

import org.apache.spark.graphx.impl._

import org.apache.spark.graphx.lib._

import org.apache.spark.graphx.util._

â€‹

â€‹

val graph: Graph[Double, Int] =

GraphGenerators.logNormalGraph(sc, numVertices = 12)

.mapVertices( (id, _) => id.toDouble )

.partitionBy(PartitionStrategy.EdgePartition2D,4)

graph.edges.count

/*

res284: Long = 68

*/

â€‹

graph.edges.mapPartitions{case (rows) =>

{Iterator((rows.mkString))}}.collect

.foreach(println)

â€‹

/*

Edge(0,0,1)Edge(0,2,1)Edge(0,2,1)Edge(0,6,1)Edge(0,10,1)Edge(2,4,1)Edge(2,8,1)Edge(4,0,1)Edge(4,4,1)Edge(4,6,1)Edge(4,10,1)Edge(6,8,1)Edge(8,0,1)Edge(8,6,1)Edge(8,10,1)Edge(10,2,1)Edge(10,6,1)Edge(10,8,1)Edge(10,10,1)

Edge(0,1,1)Edge(0,1,1)Edge(0,3,1)Edge(0,5,1)Edge(0,7,1)Edge(0,9,1)Edge(2,1,1)Edge(2,3,1)Edge(2,9,1)Edge(4,1,1)Edge(4,3,1)Edge(4,5,1)Edge(4,5,1)Edge(4,5,1)Edge(4,9,1)Edge(4,11,1)Edge(6,11,1)Edge(8,1,1)Edge(8,7,1)Edge(10,1,1)Edge(10,3,1)Edge(10,5,1)Edge(10,7,1)Edge(10,9,1)Edge(10,11,1)

Edge(1,4,1)Edge(1,8,1)Edge(1,8,1)Edge(3,0,1)Edge(3,0,1)Edge(5,0,1)Edge(5,2,1)Edge(5,4,1)Edge(7,0,1)Edge(7,2,1)Edge(7,4,1)Edge(9,0,1)

Edge(1,5,1)Edge(3,9,1)Edge(5,3,1)Edge(5,7,1)Edge(5,11,1)Edge(5,11,1)Edge(7,3,1)Edge(9,5,1)Edge(9,9,1)Edge(9,11,1)Edge(11,7,1)Edge(11,9,1)

*/

â€‹

â€‹

**RandomVertexCut:**

This partitioning strategy is simple: Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices

case object RandomVertexCut extends

PartitionStrategy {

override def getPartition(src: VertexId,

dst: VertexId,

numParts: PartitionID): PartitionID = {

math.abs((src, dst).hashCode()) % numParts

}

}

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is RandomVertexCut

import org.apache.spark.graphx

import org.apache.spark.graphx.impl._

import org.apache.spark.graphx.lib._

import org.apache.spark.graphx.util._

â€‹

â€‹

val graph: Graph[Double, Int] =

GraphGenerators.logNormalGraph(sc, numVertices = 12)

.mapVertices( (id, _) => id.toDouble )

.partitionBy(PartitionStrategy.RandomVertexCut,4)

graph.edges.count

/*

res286: Long = 80

*/

graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)

/*

Edge(0,10,1)Edge(1,10,1)Edge(1,10,1)Edge(2,1,1)Edge(2,6,1)Edge(4,10,1)Edge(4,10,1)Edge(5,7,1)Edge(6,3,1)Edge(6,6,1)Edge(6,6,1)Edge(6,6,1)Edge(6,10,1)Edge(6,11,1)Edge(7,7,1)Edge(9,9,1)Edge(10,2,1)Edge(10,10,1)Edge(10,10,1)Edge(10,10,1)Edge(11,2,1)Edge(11,2,1)Edge(11,7,1)Edge(11,11,1)

Edge(1,1,1)Edge(4,5,1)Edge(4,7,1)Edge(4,9,1)Edge(5,3,1)Edge(6,4,1)Edge(6,9,1)Edge(7,1,1)Edge(7,2,1)Edge(7,2,1)Edge(7,5,1)Edge(7,5,1)Edge(7,5,1)Edge(8,4,1)Edge(8,8,1)Edge(8,10,1)

Edge(1,4,1)Edge(2,7,1)Edge(3,0,1)Edge(4,11,1)Edge(5,1,1)Edge(6,1,1)Edge(6,8,1)Edge(6,8,1)Edge(7,4,1)Edge(7,4,1)Edge(7,4,1)Edge(8,1,1)Edge(8,2,1)Edge(9,7,1)Edge(9,10,1)Edge(10,9,1)Edge(10,9,1)Edge(11,1,1)

Edge(0,7,1)Edge(2,2,1)Edge(2,5,1)Edge(2,5,1)Edge(2,5,1)Edge(4,1,1)Edge(4,2,1)Edge(4,4,1)Edge(4,4,1)Edge(4,6,1)Edge(5,0,1)Edge(5,10,1)Edge(7,6,1)Edge(8,11,1)Edge(10,6,1)Edge(10,6,1)Edge(10,8,1)Edge(10,8,1)Edge(11,4,1)Edge(11,9,1)Edge(11,10,1)Edge(11,10,1)

*/

**CanonicalRandomVertexCut.**

Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.

case object CanonicalRandomVertexCut

extends PartitionStrategy {

override def getPartition(src: VertexId,

dst: VertexId, numParts: PartitionID): PartitionID = {

if (src < dst) {

math.abs((src, dst).hashCode()) % numParts

} else {

math.abs((dst, src).hashCode()) % numParts

}

}

}

Create a random Graph that has 12 vertices and stored in 4 partitions, each partition is to be run on a computer in the Spark cluster, partition strategy is CononicalRandomVertexCut

import org.apache.spark.graphx

import org.apache.spark.graphx.impl._

import org.apache.spark.graphx.lib._

import org.apache.spark.graphx.util._

â€‹

â€‹

val graph: Graph[Double, Int] =

GraphGenerators.logNormalGraph(sc, numVertices = 12)

.mapVertices( (id, _) => id.toDouble )

.partitionBy(PartitionStrategy.CanonicalRandomVertexCut,4)

graph.edges.count

/*

res289: Long = 89

*/

graph.edges.mapPartitions{case (rows) => {Iterator((rows.mkString))}}.collect.foreach(println)

/*

Edge(0,4,1)Edge(0,9,1)Edge(0,9,1)Edge(1,10,1)Edge(1,10,1)Edge(2,11,1)Edge(4,0,1)Edge(4,10,1)Edge(5,0,1)Edge(6,2,1)Edge(6,5,1)Edge(6,11,1)Edge(7,7,1)Edge(7,7,1)Edge(7,7,1)Edge(9,0,1)Edge(9,8,1)Edge(9,8,1)Edge(9,9,1)Edge(10,1,1)Edge(10,4,1)Edge(10,4,1)Edge(10,4,1)Edge(11,11,1)

Edge(0,2,1)Edge(0,8,1)Edge(0,8,1)Edge(1,6,1)Edge(1,6,1)Edge(3,1,1)Edge(3,1,1)Edge(3,1,1)Edge(3,5,1)Edge(3,5,1)Edge(4,2,1)Edge(4,5,1)Edge(4,7,1)Edge(4,8,1)Edge(5,3,1)Edge(6,1,1)Edge(6,9,1)Edge(8,0,1)Edge(8,0,1)Edge(9,4,1)Edge(9,6,1)Edge(9,7,1)Edge(10,8,1)Edge(11,9,1)Edge(11,9,1)

Edge(0,6,1)Edge(0,6,1)Edge(1,4,1)Edge(1,8,1)Edge(2,3,1)Edge(2,7,1)Edge(2,9,1)Edge(3,7,1)Edge(3,7,1)Edge(4,1,1)Edge(8,6,1)Edge(9,2,1)Edge(10,7,1)Edge(10,11,1)Edge(11,4,1)Edge(11,10,1)Edge(11,10,1)

Edge(0,11,1)Edge(1,9,1)Edge(1,11,1)Edge(1,11,1)Edge(2,2,1)Edge(2,5,1)Edge(3,8,1)Edge(3,9,1)Edge(3,10,1)Edge(5,2,1)Edge(5,9,1)Edge(5,11,1)Edge(6,4,1)Edge(6,4,1)Edge(6,7,1)Edge(7,1,1)Edge(7,1,1)Edge(8,3,1)Edge(8,5,1)Edge(9,3,1)Edge(11,0,1)Edge(11,1,1)Edge(11,5,1)

*/

fromEdges

Build graph from edges

def fromEdges[VD: ClassTag, ED: ClassTag](

edges: RDD[Edge[ED]],

defaultValue: VD,

edgeStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY,

vertexStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY):

Graph[VD, ED] = {

GraphImpl(edges, defaultValue,

edgeStorageLevel, vertexStorageLevel)

}

â€‹

â€‹

fromEdgeTuples

Build graph from connected vertices

def fromEdgeTuples[VD: ClassTag](

rawEdges: RDD[(VertexId, VertexId)],

defaultValue: VD,

uniqueEdges: Option[PartitionStrategy] = None,

edgeStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY,

vertexStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY): Graph[VD, Int] =

{

val edges = rawEdges.map(p => Edge(p._1, p._2, 1))

val graph = GraphImpl(edges, defaultValue,

edgeStorageLevel, vertexStorageLevel)

uniqueEdges match {

case Some(p) => graph.partitionBy(p)

.groupEdges((a, b) => a + b)

case None => graph

}

}

â€‹

You notice both Graph creation methods are through apply method of class GraphImpl.

Assume the data file looks like below:

cat followers.txt

3 6

3 5

3 4

3 2

2 1

4 1

1 2

6 3

7 3

7 6

6 7

3 7

Build a graph using edgeListFile method of GraphLoader Class

import org.apache.spark.graphx.GraphLoader

val followerGraph = GraphLoader

.edgeListFile(sc, "followers.txt")

followerGraph.triplets.foreach(println)

/*

((3,1),(7,1),1)

((6,1),(3,1),1)

((6,1),(7,1),1)

((7,1),(3,1),1)

((7,1),(6,1),1)

((1,1),(2,1),1)

((2,1),(1,1),1)

((3,1),(2,1),1)

((3,1),(4,1),1)

((3,1),(5,1),1)

((3,1),(6,1),1)

((4,1),(1,1),1)

â€‹

*/

â€‹

followerGraph.vertices.foreach(println)

/*

(1,1)

(3,1)

(4,1)

(6,1)

(2,1)

(7,1)

(5,1)

*/

â€‹

followerGraph.edges.foreach(println)

/*

Edge(1,2,1)

Edge(2,1,1)

Edge(3,2,1)

Edge(3,4,1)

Edge(3,5,1)

Edge(3,6,1)

Edge(4,1,1)

Edge(3,7,1)

Edge(6,3,1)

Edge(6,7,1)

Edge(7,3,1)

Edge(7,6,1)

*/

â€‹

//Notice both vertex attribute and

//edge attribute are set to 1

//by edgeListFile method

If want to know why vertex attribute and edge attribute are set to 1, here is the open source of method edgeListFile

def edgeListFile(

sc: SparkContext,

path: String,

canonicalOrientation: Boolean = false,

numEdgePartitions: Int = -1,

edgeStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY,

vertexStorageLevel: StorageLevel =

StorageLevel.MEMORY_ONLY)

: Graph[Int, Int] =

{

val startTimeNs = System.nanoTime()

â€‹

// Parse the edge data table directly into

// edge partitions

val lines =

if (numEdgePartitions > 0) {

sc.textFile(path, numEdgePartitions)

.coalesce(numEdgePartitions)

} else {

sc.textFile(path)

}

val edges = lines.mapPartitionsWithIndex {

(pid, iter) =>

val builder = new EdgePartitionBuilder[Int, Int]

iter.foreach { line =>

if (!line.isEmpty && line(0) != '#') {

val lineArray = line.split("\\s+")

if (lineArray.length < 2) {

throw new IllegalArgumentException("Invalid line: " + line)

}

val srcId = lineArray(0).toLong

val dstId = lineArray(1).toLong

if (canonicalOrientation && srcId > dstId) {

builder.add(dstId, srcId, 1)

} else {

builder.add(srcId, dstId, 1)

}

}

}

Iterator((pid, builder.toEdgePartition))

}.persist(edgeStorageLevel)

.setName("GraphLoader.edgeListFile - edges (%s)".format(path))

edges.count()

â€‹

logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" +

" to load the edges")

â€‹

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,

vertexStorageLevel = vertexStorageLevel)

} // end of edgeListFile

â€‹

â€‹

Graph computing is about converting one graph by manipulating attributes of veritices and edges to make a new graph

Use method fromEdgeTuples of Graph class to create a new with different vertex attributes, say 2 from 1 across board

val followerGraph2=Graph.fromEdgeTuples(followerGraph.triplets.map(x=>(x.srcId,x.dstId)),2)

followerGraph2.vertices.foreach(println)

â€‹

/*

(1,2)

(3,2)

(7,2)

(5,2)

(4,2)

(6,2)

(2,2)

*/

We can also change the edges attribute from 1 to 3, across the board for example, using fromEdges method of EdgeRDD class:

val edges = EdgeRDD.

fromEdges(followerGraph2.edges.toDF.rdd

.map(row=>

Edge(row.getAs[Long]("srcId"), row.getAs[Long]

("dstId"), 3)))

edges.foreach(println)

â€‹

/*

Edge(1,2,3)

Edge(2,1,3)

Edge(3,2,3)

Edge(3,4,3)

Edge(3,5,3)

Edge(3,6,3)

Edge(4,1,3)

Edge(3,7,3)

Edge(6,3,3)

Edge(6,7,3)

Edge(7,3,3)

Edge(7,6,3)

*/

Last modified 3yr ago