diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala new file mode 100644 index 0000000000000..20f893d73b1d5 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala @@ -0,0 +1,116 @@ +package org.apache.spark.examples.graphx + +import org.apache.spark._ +import org.apache.spark.graph._ +import org.apache.spark.graph.algorithms._ +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.conf.Configuration +import org.apache.mahout.text.wikipedia._ +import org.apache.spark.rdd.RDD +import java.util.Calendar +import scala.math.Ordering.Implicits._ + + +object AnalyzeWikipedia extends Logging { + + def main(args: Array[String]) = { + + + + + val host = args(0) + val fname = args(1) + // val numparts = { + // if (args.length >= 3) { + // args(2).toInt + // } else { + // 64 + // } + // } + // val preformattedFname = args(2) + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + val sc = new SparkContext(host, "AnalyzeWikipedia") + // val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2)) + + + // val conf = new Configuration + // conf.set("key.value.separator.in.input.line", " "); + // conf.set("xmlinput.start", ""); + // conf.set("xmlinput.end", ""); + + // val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) + // .map(stringify) + + // println("XML pages: " + xmlRDD.count) + // // .repartition(numparts) + + // val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } + // .filter { art => art.relevant } + + // println("Relevant pages: " + wikiRDD.count) + + // val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + // val justVids = wikiRDD.map { art => art.vertexID } + // // println("taking top vids") + // // val topvids = justVids.top(10) + // // sc.stop() + // // System.exit(0) + + // // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + // println("Edges: " + edges.count) + // println("Creating graph: " + Calendar.getInstance().getTime()) + + // val g = Graph(vertices, edges) + // val g = Graph.fromEdges(edges, 1) + // val g = Graph(edges, 1) + val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices", + minEdgePartitions = 128).cache() + println("Triplets: " + g.triplets.count) + + println("starting pagerank " + Calendar.getInstance().getTime()) + val startTime = System.currentTimeMillis + val pr = PageRank.run(g, 20) + + println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count) + println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) + println("finished join.") + + val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) + println("Top articles:\n" + topArticles.deep.mkString("\n")) + // for(v <- topArticles) { + // println(v) + // } + val article_name = "JohnsHopkinsUniversity" + // + //Find relevant vertices + g.mapTriplets(e => { + if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 } + else { e.attr } + }) + val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr }, + { (v1: String , v2: String) => v1 + "\n" + v2 }) + + // filter only vertices whose title contains JHU + val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect + println("Articles matching " + article_name) + println(relevant.deep.mkString("New Article\n")) + + sc.stop() + } + + + def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = { + tup._2.toString + } + + + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala new file mode 100644 index 0000000000000..a8cdba65b1cef --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala @@ -0,0 +1,174 @@ +package org.apache.spark.examples.graphx + +import org.apache.spark._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graph.algorithms._ +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.conf.Configuration +import org.apache.mahout.text.wikipedia._ +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import java.util.Calendar +import scala.math.Ordering.Implicits._ +import org.apache.spark.Logging +import scala.collection.mutable + + +object PrePostProcessWikipedia extends Logging { + + def main(args: Array[String]) = { + + val host = args(0) + val process = args(1) + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + val sc = new SparkContext(host, "ETL") + + val start = System.currentTimeMillis + process match { + case "pre" => { + val fname = args(2) + val outbase = args(3) + preProcess(sc, fname, outbase) + } + case "post" => { + val rankPath = args(2) + val attributePath = args(3) + val result = postProcess(sc, rankPath, attributePath) + logWarning(result) + } + case "graphx" => { + val rawData = args(2) + val result = graphx(sc, rawData) + logWarning(result) + } + case "prep" => { + val rawData = args(2) + val outBase = args(3) + prep(sc, rawData, outBase) + } + + case _ => throw new IllegalArgumentException("Please provide a valid process") + } + logWarning(process + "\tTIMEX: " + (System.currentTimeMillis - start)/1000.0) + sc.stop() + System.exit(0) + + } + + + + def prep(sc: SparkContext, rawData: String, outBase: String) { + + val conf = new Configuration + conf.set("key.value.separator.in.input.line", " "); + conf.set("xmlinput.start", ""); + conf.set("xmlinput.end", ""); + + val vertPath = outBase + "_vertices" + val rankPath = outBase + "_ranks" + val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) + .map(stringify) + val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } + .filter { art => art.relevant }.repartition(128) + val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} + verticesToSave.saveAsTextFile(vertPath) + val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D) + val pr = PageRank.runStandalone(g, 0.01) + val prToSave = pr.map {v => v._1 + "\t"+ v._2} + prToSave.saveAsTextFile(rankPath) + } + + def graphx(sc: SparkContext, rawData: String) { + + val conf = new Configuration + conf.set("key.value.separator.in.input.line", " "); + conf.set("xmlinput.start", ""); + conf.set("xmlinput.end", ""); + + val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) + .map(stringify) + val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } + .filter { art => art.relevant }.repartition(128) + val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D) + val resultG = pagerankConnComponentsAlt(4, g) + logWarning(s"Final graph has ${resultG.triplets.count} EDGES, ${resultG.vertices.count} VERTICES") +// val pr = PageRank.run(g, 20) +// val prAndTitle = g +// .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) +// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) +// top20.mkString("\n") + + } + + def pagerankConnComponentsAlt(numRepetitions: Int, g: Graph[String, Double]): Graph[String, Double] = { + var currentGraph = g + for (i <- 0 to numRepetitions) { + val pr = PageRank.run(currentGraph, 20) + val prAndTitle = currentGraph + .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) + val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) + logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}") + val top20verts = top20.map(_._1).toSet + // filter out top 20 vertices + val newGraph = currentGraph.subgraph(vpred = ((v, d) => !top20Verts.contains(v))) + val ccGraph = ConnectedComponents.run(newGraph) + val numCCs = ccGraph.vertices.aggregate(new mutable.HashSet())(((set, vtuple) => set += vtuple._2), ((set1, set2) => set1 union set2)).size + logWarning(s"Number of connected components for iteration $i: $numCCs") + // TODO will this result in too much memory overhead??? + currentGraph = newGraph + } + currentGraph + } + + // parse wikipedia xml dump and + def preProcess(sc: SparkContext, rawData: String, outBase: String) = { + + val conf = new Configuration + conf.set("key.value.separator.in.input.line", " "); + conf.set("xmlinput.start", ""); + conf.set("xmlinput.end", ""); + + val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) + .map(stringify) + val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } + .filter { art => art.relevant }.repartition(128) + val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} + verticesToSave.saveAsTextFile(outBase + "_vertices") + val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + val edgesToSave = edges.map(e => e.srcId + "\t" + e.dstId) + edgesToSave.saveAsTextFile(outBase + "_edges") + + } + + + def postProcess(sc: SparkContext, rankPath: String, attrPath: String): String = { + val ranks = GraphLoader.loadVertices(sc, rankPath).map {v => (v._1, v._2.toDouble)} + val attrs = GraphLoader.loadVertices(sc, attrPath) + + // slightly cheating, but not really + val ranksAndAttrs = ranks.join(attrs) + val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (Vid, (Double, String))) => entry._2._1)) + top20.mkString("\n") + } + + + + def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = { + tup._2.toString + } + + + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala new file mode 100644 index 0000000000000..e182462ae4cde --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala @@ -0,0 +1,98 @@ +package org.apache.spark.examples.graphx + +import java.util.regex.Pattern +import org.apache.spark.graph._ +import java.util.regex.Matcher +import scala.util.matching.Regex +import scala.collection.mutable +import scala.xml._ +import org.apache.spark.serializer.KryoRegistrator +// import org.apache.spark.util.collection.OpenHashSet +import scala.collection.immutable.HashSet +import java.security.MessageDigest +import java.nio.ByteBuffer + + +class WikiArticle(wtext: String) extends Serializable { + @transient val links: Array[String] = WikiArticle.parseLinks(wtext) + @transient val neighbors = links.map(WikiArticle.titleHash).distinct + @transient lazy val redirect: Boolean = !WikiArticle.redirectPattern.findFirstIn(wtext).isEmpty + @transient lazy val stub: Boolean = !WikiArticle.stubPattern.findFirstIn(wtext).isEmpty + @transient lazy val disambig: Boolean = !WikiArticle.disambigPattern.findFirstIn(wtext).isEmpty + @transient lazy val tiXML = WikiArticle.titlePattern.findFirstIn(wtext).getOrElse("") + val title: String = { + try { + XML.loadString(tiXML).text + } catch { + case e => "" // don't use null because we get null pointer exceptions + } + } + val relevant: Boolean = !(redirect || stub || disambig || title == null) + val vertexID: Vid = WikiArticle.titleHash(title) + val edges: HashSet[Edge[Double]] = { + val temp = neighbors.map { n => Edge(vertexID, n, 1.0) } + val set = new HashSet[Edge[Double]]() ++ temp + set + } + // val edges: HashSet[(Vid, Vid)] = { + // val temp = neighbors.map { n => (vertexID, n) } + // val set = new HashSet[(Vid, Vid)]() ++ temp + // set + // } +} + +object WikiArticle { + @transient val titlePattern = "(.*)<\\/title>".r + @transient val redirectPattern = "#REDIRECT\\s+\\[\\[(.*?)\\]\\]".r + @transient val disambigPattern = "\\{\\{disambig\\}\\}".r + @transient val stubPattern = "\\-stub\\}\\}".r + @transient val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE) + + private def parseLinks(wt: String): Array[String] = { + val linkBuilder = new mutable.ArrayBuffer[String]() + val matcher: Matcher = linkPattern.matcher(wt) + while (matcher.find()) { + val temp: Array[String] = matcher.group(1).split("\\|") + if (temp != null && temp.length > 0) { + val link: String = temp(0) + if (link.contains(":") == false) { + linkBuilder += link + } + } + } + return linkBuilder.toArray + } + + // substitute underscores for spaces and make lowercase + private def canonicalize(title: String): String = { + title.trim.toLowerCase.replace(" ", "_") + } + + // Hash of the canonical article name. Used for vertex ID. + // TODO this should be a 64bit hash + private def titleHash(title: String): Vid = { math.abs(WikiArticle.myHashcode(canonicalize(title))) } + + private def myHashcode(s: String): Long = { + // var h: Long = 1125899906842597L // prime + // var h: Long = 4294967291L // prime + // var h = 29 + // val len: Int = s.length + // for (i<- 0 until len) { + // h = 31*h + s.charAt(i) + // } + // h + // // s.hashCode() + // } + + val md: MessageDigest = MessageDigest.getInstance("MD5") + md.update(s.getBytes) + val result: Array[Byte] = md.digest() + val longResult = ByteBuffer.wrap(result).getLong + // shift result by 2 + val retval = longResult >> 10 + retval + } + +} + + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 1d029bf009e8c..5e343cd0f60f4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -314,8 +314,7 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = - { + defaultVertexAttr: VD): GraphImpl[VD, ED] = { val edgeRDD = createEdgeRDD(edges).cache() // Get the set of all vids diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 74bad66cfd018..ef8f8f8a8c716 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -302,6 +302,7 @@ object SparkBuild extends Build { "com.twitter" %% "algebird-core" % "0.1.11", "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), + "org.apache.mahout" % "mahout-integration" % "0.8", "org.apache.cassandra" % "cassandra-all" % "1.2.6" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")