TripletFields Class

class TripletFields extends Serializable

Instance constructors:

new TripletFields(useSrc: Boolean, useDst: Boolean, useEdge: Boolean)
new TripletFields()

Example:

1
import org.apache.spark.graphx
2
import org.apache.spark.graphx.impl._
3
import org.apache.spark.graphx.lib._
4
import org.apache.spark.graphx.util._
5
​
6
​
7
// $example on$
8
// Create a graph with "age" as the vertex property.
9
// Here we use a random graph for simplicity.
10
val graph: Graph[Double, Int] =
11
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
12
​
13
def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
14
// (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
15
if (triplet.srcAttr > triplet.dstAttr) {
16
// Send message to destination vertex containing counter and age
17
triplet.sendToDst((1, triplet.srcAttr))
18
}
19
// }
20
}
21
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
22
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg,TripletFields.Src)
23
​
24
olderFollowers.take(5).foreach(println)
25
​
26
/*
27
Output:
28
(34,(30,1532.0))
29
(52,(19,738.0))
30
(96,(24,966.0))
31
(4,(20,943.0))
32
(16,(25,1250.0))
33
​
34
​
35
*/
Copied!
Last modified 1yr ago
Copy link