Skip to content

Commit

Permalink
pass mergeFunc to VertexPartitionBase, where merge is handled
Browse files Browse the repository at this point in the history
  • Loading branch information
larryxiao committed Aug 19, 2014
1 parent 20d80a3 commit 52dc7f7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
7 changes: 3 additions & 4 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,9 @@ object VertexRDD {
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
): VertexRDD[VD] = {
val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2))
val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match {
case Some(p) => verticesDedup
case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size))
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
}
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ object ShippableVertexPartition {

/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
* and merging duplicate vertex atrribute with mergeFunc.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD)
: ShippableVertexPartition[VD] = {
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc)
new ShippableVertexPartition(index, values, mask, routingTable)
}

Expand Down

0 comments on commit 52dc7f7

Please sign in to comment.