From e6fc93bbba3a23ffc24d75de802107bd97b22412 Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Mon, 24 Mar 2014 03:29:19 +0000 Subject: [PATCH] Updated pipeline benchmark to handle other systems. --- .../spark/graphx/WikiPipelineBenchmark.scala | 243 +++++++----------- 1 file changed, 87 insertions(+), 156 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala index da8312b8fbef6..e7ecf8e23a497 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala @@ -14,7 +14,6 @@ import java.util.{HashSet => JHashSet, TreeSet => JTreeSet} object WikiPipelineBenchmark extends Logging { - def main(args: Array[String]) = { val host = args(0) @@ -24,31 +23,29 @@ object WikiPipelineBenchmark extends Logging { .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - val sc = new SparkContext(host, "ETL", sparkconf) + val sc = new SparkContext(host, "WikiPipeline", sparkconf) val start = System.currentTimeMillis process match { - case "pre" => { - val fname = args(2) - val outbase = args(3) - preProcess(sc, fname, outbase) - } - case "post" => { - val rankPath = args(2) - val attributePath = args(3) - val result = postProcess(sc, rankPath, attributePath) - logWarning(result) - } case "graphx" => { val rawData = args(2) val numIters = args(3).toInt val result = benchmarkGraphx(sc, rawData, numIters) -// logWarning(result) + logWarning(result) } - case "prep" => { + + case "extract" => { val rawData = args(2) val outBase = args(3) - prep(sc, rawData, outBase) + val (vertices, edges) = extractLinkGraph(sc, rawData) + writeGraphAsText(outBase, vertices, edges, 0) + } + + case "analyze" => { + val outBase = args(2) + val iter = args(3).toInt + pipelinePostProcessing(sc, outBase, iter) + } case _ => throw new IllegalArgumentException("Please provide a valid process") @@ -59,93 +56,8 @@ object WikiPipelineBenchmark extends Logging { } - - - def prep(sc: SparkContext, rawData: String, outBase: String) { - - val hadoopconf = new org.apache.hadoop.conf.Configuration - hadoopconf.set("key.value.separator.in.input.line", " "); - hadoopconf.set("xmlinput.start", ""); - hadoopconf.set("xmlinput.end", ""); - - val vertPath = outBase + "_vertices" - val rankPath = outBase + "_ranks" - - val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], hadoopconf) - .map(t => t._2.toString) - - val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) } - - val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) - val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } - val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} - verticesToSave.saveAsTextFile(vertPath) - val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - val g = Graph(vertices, edges) //TODO what to do about partitionStrategy??? - val pr = PageRank.run(g, 20) - val prToSave = pr.vertices.map {v => v._1 + "\t"+ v._2} - prToSave.saveAsTextFile(rankPath) - } - -// def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = { -// val conf = new Configuration -// conf.set("key.value.separator.in.input.line", " ") -// conf.set("xmlinput.start", "") -// conf.set("xmlinput.end", "") -// -// logWarning("about to load xml rdd") -// val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) -// .map(t => t._2.toString) -// // xmlRDD.count -// logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") -// val repartXMLRDD = xmlRDD.repartition(128) -// logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.") -// -// val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache -// logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.") -// -// val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128) -// logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions") -// -// } - def benchmarkGraphx(sc: SparkContext, rawData: String, numIters: Int) { - - val conf = new Configuration - conf.set("key.value.separator.in.input.line", " ") - conf.set("xmlinput.start", "") - conf.set("xmlinput.end", "") - - logWarning("about to load xml rdd") - val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) - .map(t => t._2.toString) - // xmlRDD.count - logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") - val repartXMLRDD = xmlRDD.repartition(128) - logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.") - - val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache - logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.") - - val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128) - logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions") - - - // val repartAllArtsRDD = allArtsRDD.repartition(128) - // logWarning(s"Total articles: Found ${repartAllArtsRDD.count} PARTITIONED articles.") - // val wikiRDD = unpartWikiRDD.repartition(128).cache - // val wikiRDD = unpartWikiRDD.coalesce(128, false).cache - // logWarning(s"WikiRDD partitions size: ${wikiRDD.partitions.size}") - - // val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) - - // val wikiRDDCount = wikiRDD.count - // logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.") - // logWarning("Counting differently") - - - val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } - val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } + val (vertices, edges) = extractLinkGraph(sc, rawData) logWarning("creating graph") val g = Graph(vertices, edges) val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache @@ -154,7 +66,6 @@ object WikiPipelineBenchmark extends Logging { val resultG = pagerankConnComponentsAlt(numIters, cleanG) logWarning(s"ORIGINAL graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES") logWarning(s"FINAL graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES") - } def pagerankConnComponentsAlt(numRepetitions: Int, g: Graph[String, Double]): Graph[String, Double] = { @@ -164,12 +75,27 @@ object WikiPipelineBenchmark extends Logging { currentGraph.cache val startTime = System.currentTimeMillis logWarning("starting pagerank") + // GRAPH VIEW + val ccGraph = ConnectedComponents.run(currentGraph).cache + val zeroVal = new JTreeSet[VertexId]() + val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => { + s.add(vtuple._2) + s + } + val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => { + s1.addAll(s2) + s1 + } + // TABLE VIEW + val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size() + logWarning(s"Number of connected components for iteration $i: $numCCs") val pr = PageRank.run(currentGraph, 20).cache pr.vertices.count logWarning("Pagerank completed") + // TABLE VIEW val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache prAndTitle.vertices.count - logWarning("join completed.") + // logWarning("join completed.") val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2)) logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}") val top20verts = top20.map(_._1).toSet @@ -178,72 +104,77 @@ object WikiPipelineBenchmark extends Logging { !top20verts.contains(v) } val newGraph = currentGraph.subgraph(x => true, filterTop20).cache - val ccGraph = ConnectedComponents.run(newGraph).cache -// val zeroVal = new mutable.HashSet[VertexId]() -// val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => { -// s.add(vtuple._2) -// s -// } -// val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2} -// val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp) - - - val zeroVal = new JTreeSet[VertexId]() - val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => { - s.add(vtuple._2) - s - } - val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => { - s1.addAll(s2) - s1 - } - val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size() - - - //(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2}) - - //(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size - logWarning(s"Number of connected components for iteration $i: $numCCs") + newGraph.vertices.count logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") - // TODO will this result in too much memory overhead??? currentGraph = newGraph } currentGraph } - // parse wikipedia xml dump and - def preProcess(sc: SparkContext, rawData: String, outBase: String) = { + def writeGraphAsText[V](basePath: String, vertices: RDD[(VertexId, U)], edges: RDD[Edge[Double]], iter: Int = 0) { + val verticesToSave = vertices.map {v => s"${v._1}\t${v._2}"} + val edgesToSave = edges.map {e => s"${e.srcId}\t${e.dstId}"} + vertices.saveAsTextFile(s"${basePath}_vertices_$iter") + edges.saveAsTextFile(s"${basePath}_edges_$iter") + } + + // assumes vertex attr is string, can be parsed afterwards + def readEdgesFromText(sc: SparkContext, path: String): RDD[(VertexId, VertexId)] = { + sc.textFile(path, 128).map { line => + val lineSplits = line.split("\\s+") + (lineSplits(0), lineSplits(1)) + } + } + def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = { val conf = new Configuration - conf.set("key.value.separator.in.input.line", " "); - conf.set("xmlinput.start", ""); - conf.set("xmlinput.end", ""); + conf.set("key.value.separator.in.input.line", " ") + conf.set("xmlinput.start", "") + conf.set("xmlinput.end", "") + + logWarning("about to load xml rdd") val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) .map(t => t._2.toString) - val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } - .filter { art => art.relevant }.repartition(128) + // xmlRDD.count + logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") + val repartXMLRDD = xmlRDD.repartition(128) + logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.") + + val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache + logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.") + + val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128) + logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions") val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } - val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} - verticesToSave.saveAsTextFile(outBase + "_vertices") val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - val edgesToSave = edges.map(e => e.srcId + "\t" + e.dstId) - edgesToSave.saveAsTextFile(outBase + "_edges") + (vertices, edges) } - - def postProcess(sc: SparkContext, rankPath: String, attrPath: String): String = { - val ranks = GraphLoader.loadVertices(sc, rankPath).map {v => (v._1, v._2.toDouble)} - val attrs = GraphLoader.loadVertices(sc, attrPath) - - // slightly cheating, but not really - val ranksAndAttrs = ranks.join(attrs) - val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)) - top20.mkString("\n") + def pipelinePostProcessing(sc: SparkContext, basePath: String, iter: Int) { + val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_prs") + .map {v => (v._1, v._2.toDouble) } + val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_ccs") + .map {v => (v._1, v._2.toLong) } + val edges = readEdgesFromText(sc, s"${basePath}_edges_$iter") + val artNames = GraphLoader.loadVertices(sc, s"${basePath}_vertices_$iter") + val rankAndTitle = artNames.join(pageranks) + val top20 = rankAndTitle.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2)) + logWarning(s"Top20 for iteration $iter:\n${top20.mkString("\n")}") + val zeroVal = new JTreeSet[VertexId]() + val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => { + s.add(vtuple._2) + s + } + val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => { + s1.addAll(s2) + s1 + } + val numCCs = connComponents.aggregate(zeroVal)(seqOp, combOp).size() + logWarning(s"Number of connected components for iteration $iter: $numCCs") + val top20verts = top20.map(_._1).toSet + val newVertices = artNames.filter { case (v, d) => !top20verts.contains(v) } + val newEdges = edges.filter { case (s, d) => !(top20verts.contains(v) || top20verts.contains(d)) } + writeGraphAsText(basePath, newVertices, newEdges, iter + 1) } } - -class MakeString(tup: (LongWritable, Text)) { - val str = tup._2.toString -} -