EdgeContext Class
1
abstract class EdgeContext[VD, ED, A] extends AnyRef
Copied!

Instance constructor:

new EdgeContext()

Abstract active members:

1
abstract def attr: ED
2
The attribute associated with the edge.
3
​
4
abstract def dstAttr: VD
5
The vertex attribute of the edge's destination vertex.
6
​
7
abstract def dstId: VertexId
8
The vertex id of the edge's destination vertex.
9
​
10
abstract def sendToDst(msg: A): Unit
11
Sends a message to the destination vertex.
12
​
13
abstract def sendToSrc(msg: A): Unit
14
Sends a message to the source vertex.
15
​
16
abstract def srcAttr: VD
17
The vertex attribute of the edge's source vertex.
18
​
19
abstract def srcId: VertexId
20
The vertex id of the edge's source vertex.
Copied!

Example:

1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
​
7
// $example on$
8
// Create a graph with "age" as the vertex property.
9
// Here we use a random graph for simplicity.
10
val graph: Graph[Double, Int] =
11
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
12
​
13
def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
14
// (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
15
if (triplet.srcAttr > triplet.dstAttr) {
16
// Send message to destination vertex containing counter and age
17
triplet.sendToDst((1, triplet.srcAttr))
18
}
19
// }
20
}
21
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
22
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg)
23
​
24
olderFollowers.take(5).foreach(println)
25
​
26
/*
27
Output:
28
(34,(36,2433.0))
29
(52,(20,1615.0))
30
(96,(2,195.0))
31
(4,(49,2511.0))
32
(16,(43,2754.0))
33
​
34
*/
35
​
Copied!
Reference:
Last modified 1yr ago