Skip to content

Commit

Permalink
Add Shortest-path computations to graphx.lib with unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
andy327 committed Feb 17, 2014
1 parent 5af4477 commit 7496d6b
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.apache.spark.graphx._
import com.twitter.algebird.{ Min, Monoid }

object ShortestPaths {
type SPMap = Map[VertexId, Min[Int]] // map of landmarks -> minimum distance to landmark
def SPMap(x: (VertexId, Min[Int])*) = Map(x: _*)
def increment(spmap: SPMap): SPMap = spmap.map { case (v, Min(d)) => v -> Min(d + 1) }

/**
* Compute the shortest paths to each landmark for each vertex and
* return an RDD with the map of landmarks to their shortest-path
* lengths.
*
* @tparam VD the shortest paths map for the vertex
* @tparam ED the incremented shortest-paths map of the originating
* vertex (discarded in the computation)
*
* @param graph the graph for which to compute the shortest paths
* @param landmarks the list of landmark vertex ids
*
* @return a graph with vertex attributes containing a map of the
* shortest paths to each landmark
*/
def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId])
(implicit m1: Manifest[VD], m2: Manifest[ED], spMapMonoid: Monoid[SPMap]): Graph[SPMap, SPMap] = {

val spGraph = graph
.mapVertices{ (vid, attr) =>
if (landmarks.contains(vid)) SPMap(vid -> Min(0))
else SPMap()
}
.mapTriplets{ edge => edge.srcAttr }

val initialMessage = SPMap()

def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
spMapMonoid.plus(attr, msg)
}

def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = {
val newAttr = increment(edge.srcAttr)
if (edge.dstAttr != spMapMonoid.plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
else Iterator.empty
}

def messageCombiner(s1: SPMap, s2: SPMap): SPMap = {
spMapMonoid.plus(s1, s2)
}

Pregel(spGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.scalatest.FunSuite

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._

class ShortestPathsSuite extends FunSuite with LocalSparkContext {

test("Shortest Path Computations") {
withSpark { sc =>
val shortestPaths = Set((1,Map(1 -> 0, 4 -> 2)), (2,Map(1 -> 1, 4 -> 2)), (3,Map(1 -> 2, 4 -> 1)),
(4,Map(1 -> 2, 4 -> 0)), (5,Map(1 -> 1, 4 -> 1)), (6,Map(1 -> 3, 4 -> 1)))
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap{ case e => Seq(e, e.swap) }
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { case (v, spMap) => (v, spMap.mapValues(_.get)) }
assert(results.toSet === shortestPaths)
}
}

}
5 changes: 3 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object SparkBuild extends Build {
case None => DEFAULT_YARN
case Some(v) => v.toBoolean
}
lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"
lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"

// Conditionally include the yarn sub-project
lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
Expand Down Expand Up @@ -322,7 +322,8 @@ object SparkBuild extends Build {
def graphxSettings = sharedSettings ++ Seq(
name := "spark-graphx",
libraryDependencies ++= Seq(
"org.jblas" % "jblas" % "1.2.3"
"org.jblas" % "jblas" % "1.2.3",
"com.twitter" %% "algebird-core" % "0.3.0"
)
)

Expand Down

0 comments on commit 7496d6b

Please sign in to comment.