class TripletFields extends Serializable
import org.apache.spark.graphx
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util._
// $example on$
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
// (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
// }
}
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg,TripletFields.Src)
olderFollowers.take(5).foreach(println)
/*
Output:
(34,(30,1532.0))
(52,(19,738.0))
(96,(24,966.0))
(4,(20,943.0))
(16,(25,1250.0))
*/