TripletFields Class

class TripletFields extends Serializable

Instance constructors:

new TripletFields(useSrc: Boolean, useDst: Boolean, useEdge: Boolean)

new TripletFields()

Example:

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._


// $example on$
    // Create a graph with "age" as the vertex property.
    // Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
//      (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
        if (triplet.srcAttr > triplet.dstAttr) {
          // Send message to destination vertex containing counter and age
          triplet.sendToDst((1, triplet.srcAttr))
        }
//      }
}
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
val olderFollowers: VertexRDD[(Int, Double)]  = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg,TripletFields.Src)

olderFollowers.take(5).foreach(println)

/*
Output:
(34,(30,1532.0))
(52,(19,738.0))
(96,(24,966.0))
(4,(20,943.0))
(16,(25,1250.0))


*/

Last updated