-
Notifications
You must be signed in to change notification settings - Fork 0
/
parseExample.scala
179 lines (151 loc) · 5.84 KB
/
parseExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*/
package com.cloudera.datascience.lsa
import com.cloudera.datascience.common.XmlInputFormat
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.umd.cloud9.collection.wikipedia.WikipediaPage
import edu.umd.cloud9.collection.wikipedia.language.EnglishWikipediaPage
import java.io.{FileOutputStream, PrintStream}
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
object ParseWikipedia {
/**
* Returns a term-document matrix where each element is the TF-IDF of the row's document and
* the column's term.
*/
def termDocumentMatrix(docs: RDD[(String, Seq[String])], stopWords: Set[String], numTerms: Int,
sc: SparkContext): (RDD[Vector], Map[Int, String], Map[Long, String], Map[String, Double]) = {
val docTermFreqs = docs.mapValues(terms => {
val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) {
(map, term) => map += term -> (map.getOrElse(term, 0) + 1)
}
termFreqsInDoc
})
docTermFreqs.cache()
val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap()
val docFreqs = documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms)
println("Number of terms: " + docFreqs.size)
saveDocFreqs("docfreqs.tsv", docFreqs)
val numDocs = docIds.size
val idfs = inverseDocumentFrequencies(docFreqs, numDocs)
// Maps terms to their indices in the vector
val idTerms = idfs.keys.zipWithIndex.toMap
val termIds = idTerms.map(_.swap)
val bIdfs = sc.broadcast(idfs).value
val bIdTerms = sc.broadcast(idTerms).value
val vecs = docTermFreqs.map(_._2).map(termFreqs => {
val docTotalTerms = termFreqs.values.sum
val termScores = termFreqs.filter {
case (term, freq) => bIdTerms.contains(term)
}.map{
case (term, freq) => (bIdTerms(term), bIdfs(term) * termFreqs(term) / docTotalTerms)
}.toSeq
Vectors.sparse(bIdTerms.size, termScores)
})
(vecs, termIds, docIds, idfs)
}
def documentFrequencies(docTermFreqs: RDD[HashMap[String, Int]]): HashMap[String, Int] = {
val zero = new HashMap[String, Int]()
def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int])
: HashMap[String, Int] = {
tfs.keySet.foreach { term =>
dfs += term -> (dfs.getOrElse(term, 0) + 1)
}
dfs
}
def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int])
: HashMap[String, Int] = {
for ((term, count) <- dfs2) {
dfs1 += term -> (dfs1.getOrElse(term, 0) + count)
}
dfs1
}
docTermFreqs.aggregate(zero)(merge, comb)
}
def documentFrequenciesDistributed(docTermFreqs: RDD[HashMap[String, Int]], numTerms: Int)
: Array[(String, Int)] = {
val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)
val ordering = Ordering.by[(String, Int), Int](_._2)
docFreqs.top(numTerms)(ordering)
}
def trimLeastFrequent(freqs: Map[String, Int], numToKeep: Int): Map[String, Int] = {
freqs.toArray.sortBy(_._2).take(math.min(numToKeep, freqs.size)).toMap
}
def inverseDocumentFrequencies(docFreqs: Array[(String, Int)], numDocs: Int)
: Map[String, Double] = {
docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap
}
def readFile(path: String, sc: SparkContext): RDD[String] = {
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable],
classOf[Text], conf)
rawXmls.map(p => p._2.toString)
}
/**
* Returns a (title, content) pair
*/
def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = {
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, pageXml)
if (page.isEmpty || !page.isArticle || page.isRedirect ||
page.getTitle.contains("(disambiguation)")) {
None
} else {
Some((page.getTitle, page.getContent))
}
}
def createNLPPipeline(): StanfordCoreNLP = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
new StanfordCoreNLP(props)
}
def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP)
: Seq[String] = {
val doc = new Annotation(text)
pipeline.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences.asScala;
token <- sentence.get(classOf[TokensAnnotation]).asScala) {
val lemma = token.get(classOf[LemmaAnnotation])
if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) {
lemmas += lemma.toLowerCase
}
}
lemmas
}
def isOnlyLetters(str: String): Boolean = {
// While loop for high performance
var i = 0
while (i < str.length) {
if (!Character.isLetter(str.charAt(i))) {
return false
}
i += 1
}
true
}
def loadStopWords(path: String) = scala.io.Source.fromFile(path).getLines().toSet
def saveDocFreqs(path: String, docFreqs: Array[(String, Int)]) {
val ps = new PrintStream(new FileOutputStream(path))
for ((doc, freq) <- docFreqs) {
ps.println(s"$doc\t$freq")
}
ps.close()
}
}