bulk synchronous parallel with Google Pregel Graphx Implementation Use Cases

What is Bulk Synchronous Parallel?

The bulk synchronous parallel (BSP) abstract computer is a bridging model for designing parallel algorithms.

BSP computer

BSP computer consists of
components capable of processing and/or local memory transactions (i.e., processors)
a network that routes messages between pairs of such components
a hardware facility that allows for the synchronization of all or a subset of components.

Pregel from Google

There are many graph parallel computing frameworks, one of them is Pregel from Google which is based on BSP ( Bulk Synchronous Parallelism) mode.
In BSP, a computation process consists of a series of global super steps, and each super step is composed of three steps:
Concurrent computing
Each calculation applying to vertices concurrently, up to the available CPU cores in each computer
Communication
Receiving messages from vertices from prior super step sending messages to the vertices to be processed in next super step
Synchronization.
Vertices can decide when to cease sending messages
Iteration stops when no message to be sent.

Google’s Pregel implementation in Apache Spark Graphx

Unlike more standard Pregel implementations, vertices in Graphx can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within Graphx.

Method pregel is defined in abstract class Graph

1
abstract class Graph[VD: ClassTag, ED: ClassTag] {
2
​
3
def pregel[A: ClassTag]
4
(initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)
5
(
6
vprog: (VertexId, VD, A) => VD,
7
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
8
mergeMsg: (A, A) => A
9
)
10
: Graph[VD, ED]
11
}
12
​
Copied!

pregel API is implemented in class GraphOps

1
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
2
/**
3
* Execute a Pregel-like iterative vertex-parallel abstraction. The
4
* user-defined vertex-program `vprog` is executed in parallel on
5
* each vertex receiving any inbound messages and computing a new
6
* value for the vertex. The `sendMsg` function is then invoked on
7
* all out-edges and is used to compute an optional message to the
8
* destination vertex. The `mergeMsg` function is a commutative
9
* associative function used to combine messages destined to the
10
* same vertex.
11
*
12
* On the first iteration all vertices receive the `initialMsg` and
13
* on subsequent iterations if a vertex does not receive a message
14
* then the vertex-program is not invoked.
15
*
16
* This function iterates until there are no remaining messages, or
17
* for `maxIterations` iterations.
18
*
19
* @tparam A the Pregel message type
20
*
21
* @param initialMsg the message each vertex will receive at the on
22
* the first iteration
23
*
24
* @param maxIterations the maximum number of iterations to run for
25
*
26
* @param activeDirection the direction of edges incident to a vertex that received a message in
27
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
28
* out-edges of vertices that received a message in the previous round will run.
29
*
30
* @param vprog the user-defined vertex program which runs on each
31
* vertex and receives the inbound message and computes a new vertex
32
* value. On the first iteration the vertex program is invoked on
33
* all vertices and is passed the default message. On subsequent
34
* iterations the vertex program is only invoked on those vertices
35
* that receive messages.
36
*
37
* @param sendMsg a user supplied function that is applied to out
38
* edges of vertices that received messages in the current
39
* iteration
40
*
41
* @param mergeMsg a user supplied function that takes two incoming
42
* messages of type A and merges them into a single message of type
43
* A. ''This function must be commutative and associative and
44
* ideally the size of A should not increase.''
45
*
46
* @return the resulting graph at the end of the computation
47
*
48
*/
49
def pregel[A: ClassTag](
50
initialMsg: A,
51
maxIterations: Int = Int.MaxValue,
52
activeDirection: EdgeDirection = EdgeDirection.Either)(
53
vprog: (VertexId, VD, A) => VD,
54
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
55
mergeMsg: (A, A) => A)
56
: Graph[VD, ED] = {
57
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
58
}
59
}
60
​
61
​
Copied!

Detail implementation is in Pregel object:

1
/**
2
* Implements a Pregel-like bulk-synchronous message-passing API.
3
*
4
* Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
5
* edges, enables the message sending computation to read both vertex attributes, and constrains
6
* messages to the graph structure. These changes allow for substantially more efficient
7
* distributed execution while also exposing greater flexibility for graph-based computation.
8
*
9
* @example We can use the Pregel abstraction to implement PageRank:
10
* {{{
11
* val pagerankGraph: Graph[Double, Double] = graph
12
* // Associate the degree with each vertex
13
* .outerJoinVertices(graph.outDegrees) {
14
* (vid, vdata, deg) => deg.getOrElse(0)
15
* }
16
* // Set the weight on the edges based on the degree
17
* .mapTriplets(e => 1.0 / e.srcAttr)
18
* // Set the vertex attributes to the initial pagerank values
19
* .mapVertices((id, attr) => 1.0)
20
*
21
* def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
22
* resetProb + (1.0 - resetProb) * msgSum
23
* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
24
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
25
* def messageCombiner(a: Double, b: Double): Double = a + b
26
* val initialMessage = 0.0
27
* // Execute Pregel for a fixed number of iterations.
28
* Pregel(pagerankGraph, initialMessage, numIter)(
29
* vertexProgram, sendMessage, messageCombiner)
30
* }}}
31
*
32
*/
33
object Pregel extends Logging {
34
​
35
/**
36
* Execute a Pregel-like iterative vertex-parallel abstraction. The
37
* user-defined vertex-program `vprog` is executed in parallel on
38
* each vertex receiving any inbound messages and computing a new
39
* value for the vertex. The `sendMsg` function is then invoked on
40
* all out-edges and is used to compute an optional message to the
41
* destination vertex. The `mergeMsg` function is a commutative
42
* associative function used to combine messages destined to the
43
* same vertex.
44
*
45
* On the first iteration all vertices receive the `initialMsg` and
46
* on subsequent iterations if a vertex does not receive a message
47
* then the vertex-program is not invoked.
48
*
49
* This function iterates until there are no remaining messages, or
50
* for `maxIterations` iterations.
51
*
52
* @tparam VD the vertex data type
53
* @tparam ED the edge data type
54
* @tparam A the Pregel message type
55
*
56
* @param graph the input graph.
57
*
58
* @param initialMsg the message each vertex will receive at the first
59
* iteration
60
*
61
* @param maxIterations the maximum number of iterations to run for
62
*
63
* @param activeDirection the direction of edges incident to a vertex that received a message in
64
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
65
* out-edges of vertices that received a message in the previous round will run. The default is
66
* `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
67
* in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
68
* *both* vertices received a message.
69
*
70
* @param vprog the user-defined vertex program which runs on each
71
* vertex and receives the inbound message and computes a new vertex
72
* value. On the first iteration the vertex program is invoked on
73
* all vertices and is passed the default message. On subsequent
74
* iterations the vertex program is only invoked on those vertices
75
* that receive messages.
76
*
77
* @param sendMsg a user supplied function that is applied to out
78
* edges of vertices that received messages in the current
79
* iteration
80
*
81
* @param mergeMsg a user supplied function that takes two incoming
82
* messages of type A and merges them into a single message of type
83
* A. ''This function must be commutative and associative and
84
* ideally the size of A should not increase.''
85
*
86
* @return the resulting graph at the end of the computation
87
*
88
*/
89
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
90
(graph: Graph[VD, ED],
91
initialMsg: A,
92
maxIterations: Int = Int.MaxValue,
93
activeDirection: EdgeDirection = EdgeDirection.Either)
94
(vprog: (VertexId, VD, A) => VD,
95
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
96
mergeMsg: (A, A) => A)
97
: Graph[VD, ED] =
98
{
99
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
100
s" but got ${maxIterations}")
101
​
102
val checkpointInterval = graph.vertices.sparkContext.getConf
103
.getInt("spark.graphx.pregel.checkpointInterval", -1)
104
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
105
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
106
checkpointInterval, graph.vertices.sparkContext)
107
graphCheckpointer.update(g)
108
​
109
// compute the messages
110
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
111
val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
112
checkpointInterval, graph.vertices.sparkContext)
113
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
114
var activeMessages = messages.count()
115
​
116
// Loop
117
var prevG: Graph[VD, ED] = null
118
var i = 0
119
while (activeMessages > 0 && i < maxIterations) {
120
// Receive the messages and update the vertices.
121
prevG = g
122
g = g.joinVertices(messages)(vprog)
123
graphCheckpointer.update(g)
124
​
125
val oldMessages = messages
126
// Send new messages, skipping edges where neither side received a message. We must cache
127
// messages so it can be materialized on the next line, allowing us to uncache the previous
128
// iteration.
129
messages = GraphXUtils.mapReduceTriplets(
130
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
131
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
132
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
133
// and the vertices of g).
134
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
135
activeMessages = messages.count()
136
​
137
logInfo("Pregel finished iteration " + i)
138
​
139
// Unpersist the RDDs hidden by newly-materialized RDDs
140
oldMessages.unpersist()
141
prevG.unpersistVertices()
142
prevG.edges.unpersist()
143
// count the iteration
144
i += 1
145
}
146
messageCheckpointer.unpersistDataSet()
147
graphCheckpointer.deleteAllCheckpoints()
148
messageCheckpointer.deleteAllCheckpoints()
149
g
150
} // end of apply
151
​
152
} // end of class Pregel
153
​
154
​
155
Super Step implementation is this while loop block, iteration will stop either no more messages to be sent or iteration counter reaches the maximum iteration limit.
156
while (activeMessages > 0 && i < maxIterations) {
157
// Receive the messages and update the vertices.
158
prevG = g
159
g = g.joinVertices(messages)(vprog)
160
graphCheckpointer.update(g)
161
​
162
val oldMessages = messages
163
// Send new messages, skipping edges where neither side received a message. We must cache
164
// messages so it can be materialized on the next line, allowing us to uncache the previous
165
// iteration.
166
messages = GraphXUtils.mapReduceTriplets(
167
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
168
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
169
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
170
// and the vertices of g).
171
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
172
activeMessages = messages.count()
173
​
174
logInfo("Pregel finished iteration " + i)
175
​
176
// Unpersist the RDDs hidden by newly-materialized RDDs
177
oldMessages.unpersist()
178
prevG.unpersistVertices()
179
prevG.edges.unpersist()
180
// count the iteration
181
i += 1
182
}
183
​
Copied!
Google’s Pregel algorithm is implemented in, as part of package org.apache.spark.graphx
While underlying logic is complicated, invoking Graphx Pregel API is not too difficult, but can be non-trivial either.
Based on pregel definition in class GraphOps
1
def pregel[A: ClassTag](
2
initialMsg: A,
3
maxIterations: Int = Int.MaxValue,
4
activeDirection: EdgeDirection = EdgeDirection.Either)(
5
vprog: (VertexId, VD, A) => VD,
6
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
7
mergeMsg: (A, A) => A)
8
: Graph[VD, ED] = {
9
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
10
}
11
]
12
}
13
​
Copied!

Arguments needed for invoking Graphx pregel API

As long as all arguments to the pregel API method are provided. What are arguments?
First you need to provide value arguments:
1
initialMsg: A,
2
initialMsg can be any types, that is why it has ClassTag A
3
maxIterations: Int, defaulted to Int.MaxValue
4
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
5
What is mandatory is initialMsg
6
​
Copied!
Second you need to define and provide 3 functional arguments.
1
vProg:
2
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
3
​
4
sendMsg:
5
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
6
​
7
mergeMsg:
8
A function that is for a vertex to combine messages received, and then process accordingly, such as updating vertex attribute.
9
​
Copied!
Once you have all above arguments ready, you can invoke the pregel method, which will return you a new Graph where its vertices having the attribute values computed by vertexProgram vProg.

Example: Populate vertices in graph with maximum attribute of a vertex

Assuming there is a graph with 4 vertices below, the attributes of vertices are marked inside the circle. That are 3,6,2 and 1 respectively.
For example: Given following Graph that has 4 vertices, with attributes inside the circles
The goal to make all vertices to have the attributes to be maximum attribute of the vertex in the graph.

How it works:

In super step 0, each Vertex sends its attribute as messages to its adjacent vertices (that has direct edge links).
In each subsequent super step:
Each vertex receives 1 or more messages that contain attribute values, it will ignore the received message (attributes) if they are not greater than its own attribute; it will replace its attribute with the max(received message attributes)
Each vertex will check its adjacent attributes, if they are all >= than its own attribute, it will not stop sending message
Iteration exits once all vertices stop sending message.
Vertex with grey color means it stops sending message. It can still receive message and apply information in the message received with mergeMsg function argument in the Pregel API.
Now write the Scala code to invoke Spark Graphx pregel API to accomplish the above functional specification.
Create a 10 vertices random graph:
1
import org.apache.spark.graphx._
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
import org.apache.spark.sql._
6
val graph: Graph[Double, Int] =
7
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
8
graph.vertices.collect.foreach(println)
9
/*
10
You notice largest attribute of a vertex is 9.0
11
(4,4.0)
12
(0,0.0)
13
(6,6.0)
14
(8,8.0)
15
(2,2.0)
16
(1,1.0)
17
(3,3.0)
18
(7,7.0)
19
(9,9.0)
20
(5,5.0)
21
​
22
*/
23
​
Copied!
Prepare arguments to invoke Google pregel API in Graphx
1
//define 3 functions as arguments of method pregel
2
val vProg = { (id: VertexId, attr: Double, msg: Double) => math.max(attr,msg) }
3
​
4
val sendMsg = { (triplet: EdgeTriplet[Double, Int]) =>
5
if (triplet.srcAttr > triplet.dstAttr) {
6
Iterator((triplet.dstId, triplet.srcAttr))
7
} else {
8
Iterator.empty
9
}
10
}
11
​
12
val reduceMsg = { (a: Double, b: Double) => math.max(a,b) }
Copied!
Here are required arguments, I use them as template every time I need prepare to invike Pregel API :
1
initialMsg: A,
2
initialMsg can be any types, that is why it has ClassTag A
3
maxIterations: Int, defaulted to Int.MaxValue
4
activeDirection: EdgeDirection, defaulted to EdgeDirection.Either
5
What is mandatory is initialMsg
6
vProg:
7
It stands for vertexProgram, a self-defined function that is the key method to be run in each super step
8
sendMsg:
9
A function to send message from a Vertex to adjacent vertices (you notice it is a list of vertices)
10
mergeMsg:
11
A function that is for a vertex to combine messages received, and then process accordingly, such as updating it attribute.
12
​
Copied!
In this example, I set initialMsg to be Double.NegativeInfinity, which is the smallest value possible, because need to find the max attribute of a vertex and populate all attributes of other vertices in the graph with that maximum attribute value.
Invoke pregel API
1
graph.pregel(Double.NegativeInfinity)(
2
vProg,
3
sendMsg,
4
reduceMsg
5
).vertices.collect.foreach(println)
6
​
7
/*
8
Output: Populate every Vertex attribute with largest attribute of a Vertex, which is 9.0 in the example
9
​
10
(4,9.0)
11
(0,9.0)
12
(6,9.0)
13
(8,9.0)
14
(2,9.0)
15
(1,9.0)
16
(3,9.0)
17
(7,9.0)
18
(9,9.0)
19
(5,9.0)
20
​
21
​
22
*/
23
​
Copied!
Following code worked the same with anonymous functions.
1
import org.apache.spark.graphx._
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
import org.apache.spark.sql._
6
val graph: Graph[Double, Int] =
7
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices( (id, _) => id.toDouble ).partitionBy(PartitionStrategy.EdgePartition2D,4)
8
graph.vertices.collect.foreach(println)
9
/*
10
(4,4.0)
11
(0,0.0)
12
(6,6.0)
13
(8,8.0)
14
(2,2.0)
15
(1,1.0)
16
(3,3.0)
17
(7,7.0)
18
(9,9.0)
19
(5,5.0)
20
​
21
*/
22
​
23
val maxGraph=graph.pregel(Double.NegativeInfinity)(
24
(id, dist, newDist) => math.max(dist, newDist),
25
triplet => { // Send Message
26
if (triplet.srcAttr > triplet.dstAttr) {
27
Iterator((triplet.dstId, triplet.srcAttr))
28
} else {
29
Iterator.empty
30
}
31
},
32
(a, b) => math.max(a, b) // Merge Message
33
​
34
)
35
maxGraph.vertices.collect.foreach(println)
36
/*
37
(4,9.0)
38
(0,9.0)
39
(6,9.0)
40
(8,9.0)
41
(2,9.0)
42
(1,9.0)
43
(3,9.0)
44
(7,9.0)
45
(9,9.0)
46
(5,9.0)
47
​
48
*/
49
​
Copied!

Example: Trucking Company

Here is a real-world example. Suppose you are running a long-haul trucking company; you serve across a region that has number of cities.
As part of the costing analysis, you want to find out shortest distance from any city to any other city in your service map.
Here is an imaginary graph, vertices are cities, edges are high way that connect the cities. Each edge has attribute denoting the number of miles between the 2 cities the edge connects.
Pregel Implementation:
Pregel API call will returns a new graph, so you want to make a Graph to be returned by Pregel with specs of vertex to look like below:
VertexId, Seq(k,v)
Attribute of vertex is a sequence of key value (k,v) pair
k=destination vertexId
v=number of miles to the destination vertex
Define helper methods:
Create initial empty Map for key value pair
1
def createMap(x: (VertexId, Int)*) = Map(x: _*)
Copied!
Add distance for the next edge to the destination vertex
1
def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }
Copied!
Create new route to the target vertex, this function will be served as mergeMsg functional argument used in the Pregel call, see code for detail
1
def addRoute(busmap1: busMap, busmap2: busMap): busMap =
2
(busmap1.keySet ++ busmap2.keySet).map {
3
k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
4
}.toMap
5
​
Copied!
Define initialMsg required by Pregel
1
//Define initialMessage needed by Pregel as argument
2
val initialMessage = createMap()
3
​
Copied!
Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps
1
def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
2
addRoute(attr, msg)
3
}
4
​
Copied!
Define sendMsg as needed by Pregel as functional argument
1
def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
2
val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
3
if (edge.srcAttr != addRoute(newAttr, edge.srcAttr))
4
Iterator((edge.srcId, newAttr))
5
else
6
// Vertices can decide not to send message with empty iterator
7
Iterator.empty
8
​
Copied!
Following is the code:
1
import org.apache.spark.graphx._
2
import scala.reflect.ClassTag
3
import org.apache.spark.graphx.impl
4
​
5
import org.apache.spark.graphx._
6
import org.apache.spark.graphx.lib._
7
import org.apache.spark.graphx.util._
8
import org.apache.spark.sql._
9
​
10
​
11
//define Map as data type
12
type busMap = Map[VertexId, Int]
13
​
14
def createMap(x: (VertexId, Int)*) = Map(x: _*)
15
​
16
def incrementDistance(busmap: busMap, attr: Int): busMap = busmap.map { case (v, d) => v -> (d + attr) }
17
​
18
def addRoute(busmap1: busMap, busmap2: busMap): busMap =
19
(busmap1.keySet ++ busmap2.keySet).map {
20
k => k -> math.min(busmap1.getOrElse(k, Int.MaxValue), busmap2.getOrElse(k, Int.MaxValue))
21
}.toMap
22
​
23
import scala.reflect.ClassTag
24
import org.apache.spark.graphx.impl
25
​
26
def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Graph[busMap, ED] = {
27
val sssp = graph.mapVertices { (vid, attr) =>
28
if (landmarks.contains(vid))
29
createMap(vid -> 0)
30
else
31
createMap()
32
}
33
​
34
//Define initialMessage needed by Pregel as argument
35
val initialMessage = createMap()
36
​
37
​
38
//Define vProg, aka, vertexProgram needed by Pregel as functional argument, that is the logic executed in the super steps
39
​
40
def vertexProgram(id: VertexId, attr: busMap, msg: busMap): busMap = {
41
addRoute(attr, msg)
42
}
43
​
44
​
45
// Define sendMsg as needed by Pregel as functional argument
46
​
47
def sendMessage(edge: EdgeTriplet[busMap, _]): Iterator[(VertexId, busMap)] = {
48
val newAttr = incrementDistance(edge.dstAttr,edge.attr.toString.toInt)
49
if (edge.srcAttr != addRoute(newAttr, edge.srcAttr))
50
Iterator((edge.srcId, newAttr))
51
else
52
// Vertices can decide not to send message with empty iterator
53
Iterator.empty
54
}
55
//Invoke Pregel
56
Pregel(sssp, initialMessage)(vertexProgram, sendMessage, addRoute)
57
}
58
​
59
//Need random generator to generate random miles for each edge
60
val r = scala.util.Random
61
​
62
//Create a random graph with 5 vertices for simplicity in the writing
63
//Attribute of edge of this graph is to store miles length of edge
64
​
65
val graph: Graph[Double, Int] =
66
GraphGenerators.logNormalGraph(sc, numVertices = 5)
67
.mapVertices( (id,attr) => (id.toDouble))
68
.mapEdges(e=>(math.abs(r.nextInt(990)+10)))
69
​
70
// Now invoke run function defined which will return graph in the //vertex attribute, stores the Map of Vertex Id and miles to it
71
​
72
​
73
for (i<-run(graph).vertices.collect.map(x=>(x._1,x._2)).toSeq.sortBy(_._1))
74
//for (i<-run(graph,seqVertexId).vertices.take(5).map(x=>(x._1,x._2)))
75
{
76
var distance=i._2
77
for ((k,v)<-distance)
78
{
79
if (i._1!=k)
80
{
81
val print_sentence="From VertexId " + i._1.toString + " to " + k.toString + " shortest distance is " + v.toString + " Miles"
82
println(print_sentence)
83
}
84
}
85
}
86
​
87
​
88
/*
89
Output
90
From VertexId 0 to 1 shortest distance is 243 Miles
91
From VertexId 0 to 2 shortest distance is 422 Miles
92
From VertexId 0 to 3 shortest distance is 1102 Miles
93
From VertexId 0 to 4 shortest distance is 787 Miles
94
From VertexId 1 to 0 shortest distance is 874 Miles
95
From VertexId 1 to 2 shortest distance is 1296 Miles
96
From VertexId 1 to 3 shortest distance is 859 Miles
97
From VertexId 1 to 4 shortest distance is 1661 Miles
98
From VertexId 2 to 0 shortest distance is 1746 Miles
99
From VertexId 2 to 1 shortest distance is 872 Miles
100
From VertexId 2 to 3 shortest distance is 813 Miles
101
From VertexId 2 to 4 shortest distance is 374 Miles
102
From VertexId 3 to 0 shortest distance is 933 Miles
103
From VertexId 3 to 1 shortest distance is 59 Miles
104
From VertexId 3 to 2 shortest distance is 682 Miles
105
From VertexId 3 to 4 shortest distance is 1056 Miles
106
From VertexId 4 to 0 shortest distance is 1372 Miles
107
From VertexId 4 to 1 shortest distance is 498 Miles
108
From VertexId 4 to 2 shortest distance is 1121 Miles
109
From VertexId 4 to 3 shortest distance is 439 Miles
110
​
111
*/
112
​
Copied!
Google Pregel in Spark Graphx is not an easy API to use, make Pregel API call correctly takes some time, and many attempts of trial and error, but it worth the effort.
As always, those codes written by me used in this writing is in my GitHub repo.
Last modified 1yr ago
Copy link