Skip to content

Commit

Permalink
Beginning of new pipeline, not compiled yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Feb 19, 2014
1 parent 2983132 commit d3bbfd0
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.apache.spark.examples.graphx

import org.apache.spark._
import org.apache.spark.graph._
import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import java.util.Calendar
import scala.math.Ordering.Implicits._


object AnalyzeWikipedia extends Logging {

def main(args: Array[String]) = {




val host = args(0)
val fname = args(1)
// val numparts = {
// if (args.length >= 3) {
// args(2).toInt
// } else {
// 64
// }
// }
// val preformattedFname = args(2)

val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")

val sc = new SparkContext(host, "AnalyzeWikipedia")
// val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2))


// val conf = new Configuration
// conf.set("key.value.separator.in.input.line", " ");
// conf.set("xmlinput.start", "<page>");
// conf.set("xmlinput.end", "</page>");

// val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// .map(stringify)

// println("XML pages: " + xmlRDD.count)
// // .repartition(numparts)

// val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
// .filter { art => art.relevant }

// println("Relevant pages: " + wikiRDD.count)

// val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
// val justVids = wikiRDD.map { art => art.vertexID }
// // println("taking top vids")
// // val topvids = justVids.top(10)
// // sc.stop()
// // System.exit(0)

// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// println("Edges: " + edges.count)
// println("Creating graph: " + Calendar.getInstance().getTime())

// val g = Graph(vertices, edges)
// val g = Graph.fromEdges(edges, 1)
// val g = Graph(edges, 1)
val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices",
minEdgePartitions = 128).cache()
println("Triplets: " + g.triplets.count)

println("starting pagerank " + Calendar.getInstance().getTime())
val startTime = System.currentTimeMillis
val pr = PageRank.run(g, 20)

println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count)
println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
println("finished join.")

val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
println("Top articles:\n" + topArticles.deep.mkString("\n"))
// for(v <- topArticles) {
// println(v)
// }
val article_name = "JohnsHopkinsUniversity"
//
//Find relevant vertices
g.mapTriplets(e => {
if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 }
else { e.attr }
})
val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr },
{ (v1: String , v2: String) => v1 + "\n" + v2 })

// filter only vertices whose title contains JHU
val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect
println("Articles matching " + article_name)
println(relevant.deep.mkString("New Article\n"))

sc.stop()
}


def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
tup._2.toString
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package org.apache.spark.examples.graphx

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import java.util.Calendar
import scala.math.Ordering.Implicits._
import org.apache.spark.Logging
import scala.collection.mutable


object PrePostProcessWikipedia extends Logging {

def main(args: Array[String]) = {

val host = args(0)
val process = args(1)

val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")

val sc = new SparkContext(host, "ETL")

val start = System.currentTimeMillis
process match {
case "pre" => {
val fname = args(2)
val outbase = args(3)
preProcess(sc, fname, outbase)
}
case "post" => {
val rankPath = args(2)
val attributePath = args(3)
val result = postProcess(sc, rankPath, attributePath)
logWarning(result)
}
case "graphx" => {
val rawData = args(2)
val result = graphx(sc, rawData)
logWarning(result)
}
case "prep" => {
val rawData = args(2)
val outBase = args(3)
prep(sc, rawData, outBase)
}

case _ => throw new IllegalArgumentException("Please provide a valid process")
}
logWarning(process + "\tTIMEX: " + (System.currentTimeMillis - start)/1000.0)
sc.stop()
System.exit(0)

}



def prep(sc: SparkContext, rawData: String, outBase: String) {

val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");

val vertPath = outBase + "_vertices"
val rankPath = outBase + "_ranks"
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(vertPath)
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
val pr = PageRank.runStandalone(g, 0.01)
val prToSave = pr.map {v => v._1 + "\t"+ v._2}
prToSave.saveAsTextFile(rankPath)
}

def graphx(sc: SparkContext, rawData: String) {

val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");

val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
val resultG = pagerankConnComponentsAlt(4, g)
logWarning(s"Final graph has ${resultG.triplets.count} EDGES, ${resultG.vertices.count} VERTICES")
// val pr = PageRank.run(g, 20)
// val prAndTitle = g
// .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
// top20.mkString("\n")

}

def pagerankConnComponentsAlt(numRepetitions: Int, g: Graph[String, Double]): Graph[String, Double] = {
var currentGraph = g
for (i <- 0 to numRepetitions) {
val pr = PageRank.run(currentGraph, 20)
val prAndTitle = currentGraph
.outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}")
val top20verts = top20.map(_._1).toSet
// filter out top 20 vertices
val newGraph = currentGraph.subgraph(vpred = ((v, d) => !top20Verts.contains(v)))
val ccGraph = ConnectedComponents.run(newGraph)
val numCCs = ccGraph.vertices.aggregate(new mutable.HashSet())(((set, vtuple) => set += vtuple._2), ((set1, set2) => set1 union set2)).size
logWarning(s"Number of connected components for iteration $i: $numCCs")
// TODO will this result in too much memory overhead???
currentGraph = newGraph
}
currentGraph
}

// parse wikipedia xml dump and
def preProcess(sc: SparkContext, rawData: String, outBase: String) = {

val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");

val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(outBase + "_vertices")
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val edgesToSave = edges.map(e => e.srcId + "\t" + e.dstId)
edgesToSave.saveAsTextFile(outBase + "_edges")

}


def postProcess(sc: SparkContext, rankPath: String, attrPath: String): String = {
val ranks = GraphLoader.loadVertices(sc, rankPath).map {v => (v._1, v._2.toDouble)}
val attrs = GraphLoader.loadVertices(sc, attrPath)

// slightly cheating, but not really
val ranksAndAttrs = ranks.join(attrs)
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (Vid, (Double, String))) => entry._2._1))
top20.mkString("\n")
}



def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
tup._2.toString
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.apache.spark.examples.graphx

import java.util.regex.Pattern
import org.apache.spark.graph._
import java.util.regex.Matcher
import scala.util.matching.Regex
import scala.collection.mutable
import scala.xml._
import org.apache.spark.serializer.KryoRegistrator
// import org.apache.spark.util.collection.OpenHashSet
import scala.collection.immutable.HashSet
import java.security.MessageDigest
import java.nio.ByteBuffer


class WikiArticle(wtext: String) extends Serializable {
@transient val links: Array[String] = WikiArticle.parseLinks(wtext)
@transient val neighbors = links.map(WikiArticle.titleHash).distinct
@transient lazy val redirect: Boolean = !WikiArticle.redirectPattern.findFirstIn(wtext).isEmpty
@transient lazy val stub: Boolean = !WikiArticle.stubPattern.findFirstIn(wtext).isEmpty
@transient lazy val disambig: Boolean = !WikiArticle.disambigPattern.findFirstIn(wtext).isEmpty
@transient lazy val tiXML = WikiArticle.titlePattern.findFirstIn(wtext).getOrElse("")
val title: String = {
try {
XML.loadString(tiXML).text
} catch {
case e => "" // don't use null because we get null pointer exceptions
}
}
val relevant: Boolean = !(redirect || stub || disambig || title == null)
val vertexID: Vid = WikiArticle.titleHash(title)
val edges: HashSet[Edge[Double]] = {
val temp = neighbors.map { n => Edge(vertexID, n, 1.0) }
val set = new HashSet[Edge[Double]]() ++ temp
set
}
// val edges: HashSet[(Vid, Vid)] = {
// val temp = neighbors.map { n => (vertexID, n) }
// val set = new HashSet[(Vid, Vid)]() ++ temp
// set
// }
}

object WikiArticle {
@transient val titlePattern = "<title>(.*)<\\/title>".r
@transient val redirectPattern = "#REDIRECT\\s+\\[\\[(.*?)\\]\\]".r
@transient val disambigPattern = "\\{\\{disambig\\}\\}".r
@transient val stubPattern = "\\-stub\\}\\}".r
@transient val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE)

private def parseLinks(wt: String): Array[String] = {
val linkBuilder = new mutable.ArrayBuffer[String]()
val matcher: Matcher = linkPattern.matcher(wt)
while (matcher.find()) {
val temp: Array[String] = matcher.group(1).split("\\|")
if (temp != null && temp.length > 0) {
val link: String = temp(0)
if (link.contains(":") == false) {
linkBuilder += link
}
}
}
return linkBuilder.toArray
}

// substitute underscores for spaces and make lowercase
private def canonicalize(title: String): String = {
title.trim.toLowerCase.replace(" ", "_")
}

// Hash of the canonical article name. Used for vertex ID.
// TODO this should be a 64bit hash
private def titleHash(title: String): Vid = { math.abs(WikiArticle.myHashcode(canonicalize(title))) }

private def myHashcode(s: String): Long = {
// var h: Long = 1125899906842597L // prime
// var h: Long = 4294967291L // prime
// var h = 29
// val len: Int = s.length
// for (i<- 0 until len) {
// h = 31*h + s.charAt(i)
// }
// h
// // s.hashCode()
// }

val md: MessageDigest = MessageDigest.getInstance("MD5")
md.update(s.getBytes)
val result: Array[Byte] = md.digest()
val longResult = ByteBuffer.wrap(result).getLong
// shift result by 2
val retval = longResult >> 10
retval
}

}


Loading

0 comments on commit d3bbfd0

Please sign in to comment.