From b788bfb0f79441f26ca705ed235c861ba6be0b42 Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Tue, 25 Feb 2014 11:10:16 +0000 Subject: [PATCH] added more debug logging. --- .../spark/graphx/PrePostProcessWiki.scala | 18 ++++++++++++++---- .../org/apache/spark/graphx/WikiArticle.scala | 6 ++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala b/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala index 26302d22be2ea..8b59e5619a739 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala @@ -77,8 +77,9 @@ object PrePostProcessWikipedia extends Logging { val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], hadoopconf) .map(t => t._2.toString) - val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } - .filter { art => art.relevant }.repartition(128) + val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) } + + val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} verticesToSave.saveAsTextFile(vertPath) @@ -101,8 +102,15 @@ object PrePostProcessWikipedia extends Logging { .map(t => t._2.toString) // xmlRDD.count logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") - val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } - .filter { art => art.relevant }.repartition(128) + + val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache + val numRedirects = allArtsRDD.filter { art => art.redirect }.count + val numStubs = allArtsRDD.filter { art => art.stub }.count + val numDisambig = allArtsRDD.filter { art => art.disambig }.count + val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count + logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound") + + val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.") val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } @@ -126,6 +134,7 @@ object PrePostProcessWikipedia extends Logging { var currentGraph = g logWarning("starting iterations") for (i <- 0 to numRepetitions) { + val startTime = System.currentTimeMillis logWarning("starting pagerank") val pr = PageRank.run(currentGraph, 20) pr.vertices.count @@ -167,6 +176,7 @@ object PrePostProcessWikipedia extends Logging { //(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size logWarning(s"Number of connected components for iteration $i: $numCCs") + logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") // TODO will this result in too much memory overhead??? currentGraph = newGraph } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala index a55b4871185dd..86eb40d8ccce8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala @@ -24,10 +24,10 @@ class WikiArticle(wtext: String) extends Serializable { try { XML.loadString(tiXML).text } catch { - case e => "NOTFOUND" // don't use null because we get null pointer exceptions + case e => WikiArticle.notFoundString // don't use null because we get null pointer exceptions } } - val relevant: Boolean = !(redirect || stub || disambig || title == "NOTFOUND" || title == null) + val relevant: Boolean = !(redirect || stub || disambig || title == WikiArticle.notFoundString || title == null) val vertexID: VertexId = WikiArticle.titleHash(title) val edges: HashSet[Edge[Double]] = { val temp = neighbors.map { n => Edge(vertexID, n, 1.0) } @@ -48,6 +48,8 @@ object WikiArticle { @transient val stubPattern = "\\-stub\\}\\}".r @transient val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE) + val notFoundString = "NOTFOUND" + private def parseLinks(wt: String): Array[String] = { val linkBuilder = new mutable.ArrayBuffer[String]() val matcher: Matcher = linkPattern.matcher(wt)