EdgeContext Class

abstract class EdgeContext[VD, ED, A] extends AnyRef

Instance constructor:

new EdgeContext()

Abstract active members:

abstract def attr: ED 
The attribute associated with the edge.

abstract def dstAttr: VD 
The vertex attribute of the edge's destination vertex.

abstract def dstId: VertexId 
The vertex id of the edge's destination vertex.

abstract def sendToDst(msg: A): Unit 
Sends a message to the destination vertex.

abstract def sendToSrc(msg: A): Unit 
Sends a message to the source vertex.

abstract def srcAttr: VD 
The vertex attribute of the edge's source vertex.

abstract def srcId: VertexId 
The vertex id of the edge's source vertex.

Example:

import org.apache.spark.graphx 
import org.apache.spark.graphx.impl._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.graphx.util._


// $example on$
    // Create a graph with "age" as the vertex property.
    // Here we use a random graph for simplicity.
    val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

def sendMsg(triplet: EdgeContext[Double, Int, (Int, Double)]): Unit ={
//      (triplet:EdgeContext[Double, Int, (Int, Double)]) => { // Map Function
        if (triplet.srcAttr > triplet.dstAttr) {
          // Send message to destination vertex containing counter and age
          triplet.sendToDst((1, triplet.srcAttr))
        }
//      }
}
def mergeMsg(a:(Int, Double), b:(Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
val olderFollowers: VertexRDD[(Int, Double)]  = graph.aggregateMessages[(Int, Double)](sendMsg, mergeMsg)

olderFollowers.take(5).foreach(println)

/*
Output:
(34,(36,2433.0))
(52,(20,1615.0))
(96,(2,195.0))
(4,(49,2511.0))
(16,(43,2754.0))

*/

Reference:

https://github.com/apache/spark/blob/v2.4.5/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala

Last updated