EdgeContext Class
abstract class EdgeContext[VD, ED, A] extends AnyRef
Instance constructor:
new EdgeContext()
Abstract active members:
abstract def attr: ED
The attribute associated with the edge.
abstract def dstAttr: VD
The vertex attribute of the edge's destination vertex.
abstract def dstId: VertexId
The vertex id of the edge's destination vertex.
abstract def sendToDst(msg: A): Unit
Sends a message to the destination vertex.
abstract def sendToSrc(msg: A): Unit
Sends a message to the source vertex.
abstract def srcAttr: VD
The vertex attribute of the edge's source vertex.
abstract def srcId: VertexId
The vertex id of the edge's source vertex.
Example:
import org.apache.spark.graphx
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util._
// $example on$
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
// (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
// }
}
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg)
olderFollowers.take(5).foreach(println)
/*
Output:
(34,(36,2433.0))
(52,(20,1615.0))
(96,(2,195.0))
(4,(49,2511.0))
(16,(43,2754.0))
*/
Reference:
Last updated