Skip to content

Commit

Permalink
Add new DataFrame matchbox udfs (#387)
Browse files Browse the repository at this point in the history
- Add DetectLanguageDF
- Add ExtractBoilerpipeTextDF
- Add ExtractDateDF
- Update tests
- Rename existing ExtractDate, ExtractBoilerpipeText, DetectLanguage udfs by appending RDD
- Partially addresses #223
  • Loading branch information
Gursimran Singh authored and ruebot committed Dec 5, 2019
1 parent 560ed2b commit 079cd24
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 41 deletions.
18 changes: 9 additions & 9 deletions src/main/scala/io/archivesunleashed/ArchiveRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream
import java.security.MessageDigest

import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils, ArchiveRecordWritable}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDate, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDateRDD, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import org.apache.spark.SerializableWritable
import org.archive.io.arc.ARCRecord
import org.archive.io.warc.WARCRecord
Expand Down Expand Up @@ -84,25 +84,25 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends

val getCrawlDate: String = {
if (recordFormat == ArchiveRecordWritable.ArchiveFormat.ARC){
ExtractDate(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDate.DateComponent.YYYYMMDD)
ExtractDateRDD(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDateRDD.DateComponent.YYYYMMDD)
} else {
ExtractDate(
ExtractDateRDD(
ArchiveUtils.get14DigitDate(
ISO8601.parse(r.t.getRecord.asInstanceOf[WARCRecord].getHeader.getDate)),
ExtractDate.DateComponent.YYYYMMDD)
ExtractDateRDD.DateComponent.YYYYMMDD)
}
}

val getCrawlMonth: String = {
if (recordFormat == ArchiveRecordWritable.ArchiveFormat.ARC) {
ExtractDate(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDate.DateComponent.YYYYMM)
ExtractDateRDD(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDateRDD.DateComponent.YYYYMM)
} else {
ExtractDate(
ExtractDateRDD(
ArchiveUtils.get14DigitDate(
ISO8601.parse(r.t.getRecord.asInstanceOf[WARCRecord].getHeader.getDate)),
ExtractDate.DateComponent.YYYYMM)
ExtractDateRDD.DateComponent.YYYYMM)
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed

import org.apache.commons.io.IOUtils
import io.archivesunleashed.matchbox.ComputeMD5RDD
import io.archivesunleashed.matchbox.{ComputeMD5RDD}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import java.io.ByteArrayInputStream
Expand Down Expand Up @@ -51,6 +51,12 @@ package object df {

val ComputeImageSizeDF = udf(io.archivesunleashed.matchbox.ComputeImageSize.apply(_: Array[Byte]))

val DetectLanguageDF = udf(io.archivesunleashed.matchbox.DetectLanguageRDD.apply(_: String))

val ExtractBoilerpipeTextDF = udf(io.archivesunleashed.matchbox.ExtractBoilerpipeTextRDD.apply(_: String))

val ExtractDateDF = udf((io.archivesunleashed.matchbox.ExtractDateRDD.apply(_: String, _: String)))

/**
* Given a dataframe, serializes binary object and saves to disk
* @param df the input dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.tika.language.detect.LanguageDetector;
import org.apache.tika.language.detect.LanguageResult;

/** Detects language using Apache Tika. */
object DetectLanguage {
object DetectLanguageRDD {

/** Detects the language of a String input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import de.l3s.boilerpipe.extractors.DefaultExtractor
import java.io.IOException

/** Extract raw text content from an HTML page, minus "boilerplate" content (using boilerpipe). */
object ExtractBoilerpipeText {
object ExtractBoilerpipeTextRDD {
/** Uses boilerpipe to extract raw text content from a page.
*
* ExtractBoilerpipeText removes boilerplate text (e.g. a copyright statement) from an HTML string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed.matchbox

/** Gets different parts of a dateString. */
object ExtractDate {
object ExtractDateRDD {
object DateComponent extends Enumeration {
/** An enum specifying years, months, days or a combination. */
type DateComponent = Value
Expand Down Expand Up @@ -49,4 +49,29 @@ object ExtractDate {
""
}
}

/** Extracts a provided date component from a date (for DataFrames).
*
* @param fullDate date returned by `WARecord.getCrawlDate`, formatted as YYYYMMDD
* @param dateFormat in String format
*/
def apply(fullDate: String, dateFormat: String): String = {
val startSS = 0
val yearSS = 4
val monthSS = 6
val daySS = 8
val maybeFullDate: Option[String] = Option(fullDate)
maybeFullDate match {
case Some(fulldate) =>
dateFormat match {
case "YYYY" => fullDate.substring(startSS, yearSS)
case "MM" => fullDate.substring(yearSS, monthSS)
case "DD" => fullDate.substring(monthSS, daySS)
case "YYYYMM" => fullDate.substring(startSS, monthSS)
case _ => fullDate.substring(startSS, daySS)
}
case None =>
""
}
}
}
12 changes: 6 additions & 6 deletions src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.util.Base64

import io.archivesunleashed.data.{ArchiveRecordInputFormat, ArchiveRecordWritable}
import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{DetectLanguage, DetectMimeTypeTika, ExtractDate,
import io.archivesunleashed.matchbox.{DetectLanguageRDD, DetectMimeTypeTika, ExtractDateRDD,
ExtractDomainRDD, ExtractImageDetails, ExtractImageLinksRDD,
ExtractLinksRDD, GetExtensionMimeRDD, RemoveHTMLRDD}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -542,7 +542,7 @@ package object archivesunleashed {
* @param component the selected DateComponent enum value
*/
def keepDate(dates: List[String], component: DateComponent = DateComponent.YYYYMMDD): RDD[ArchiveRecord] = {
rdd.filter(r => dates.contains(ExtractDate(r.getCrawlDate, component)))
rdd.filter(r => dates.contains(ExtractDateRDD(r.getCrawlDate, component)))
}

/** Removes all data but selected exact URLs.
Expand Down Expand Up @@ -579,7 +579,7 @@ package object archivesunleashed {
* @param lang a set of ISO 639-2 codes
*/
def keepLanguages(lang: Set[String]): RDD[ArchiveRecord] = {
rdd.filter(r => lang.contains(DetectLanguage(RemoveHTMLRDD(r.getContentString))))
rdd.filter(r => lang.contains(DetectLanguageRDD(RemoveHTMLRDD(r.getContentString))))
}

/** Removes all content that does not pass Regular Expression test.
Expand Down Expand Up @@ -674,7 +674,7 @@ package object archivesunleashed {
* @param lang a set of ISO 639-2 codes
*/
def discardLanguages(lang: Set[String]): RDD[ArchiveRecord] = {
rdd.filter(r => !lang.contains(DetectLanguage(RemoveHTMLRDD(r.getContentString))))
rdd.filter(r => !lang.contains(DetectLanguageRDD(RemoveHTMLRDD(r.getContentString))))
}
}
}
6 changes: 3 additions & 3 deletions src/test/scala/io/archivesunleashed/ArcTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.archivesunleashed

import com.google.common.io.Resources
import io.archivesunleashed.matchbox.{DetectLanguage, DetectMimeTypeTika, ExtractLinksRDD, RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.{DetectLanguageRDD, DetectMimeTypeTika, ExtractLinksRDD, RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -82,7 +82,7 @@ class ArcTest extends FunSuite with BeforeAndAfter {
val languageCounts = RecordLoader.loadArchives(arcPath, sc)
.keepMimeTypes(Set("text/html"))
.map(r => RemoveHTMLRDD(r.getContentString))
.groupBy(content => DetectLanguage(content))
.groupBy(content => DetectLanguageRDD(content))
.map(f => {
(f._1, f._2.size)
})
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/io/archivesunleashed/RecordRDDTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.archivesunleashed

import com.google.common.io.Resources
import io.archivesunleashed.matchbox.ExtractDate
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -62,7 +62,7 @@ class RecordRDDTest extends FunSuite with BeforeAndAfter {
val base = RecordLoader.loadArchives(arcPath, sc)
val component = DateComponent.YYYY
val r = base
.filter (x => ExtractDate(x.getCrawlDate, component) == testDate)
.filter (x => ExtractDateRDD(x.getCrawlDate, component) == testDate)
.map ( mp => mp.getUrl).take(3)
val r2 = base.keepDate(List(testDate), component)
.map ( mp => mp.getUrl).take(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class ExtractBoilerPipeTextTest extends FunSuite {
var boiler = """Copyright 2017"""

test("Collects boilerpipe") {
assert(ExtractBoilerpipeText(text) == boiler)
assert(ExtractBoilerpipeTextRDD(text) == boiler)
// scalastyle:off null
assert(ExtractBoilerpipeText(null) == "")
assert(ExtractBoilerpipeTextRDD(null) == "")
// scalastyle:on null
assert(ExtractBoilerpipeText("All Rights Reserved.") == "")
assert(ExtractBoilerpipeTextRDD("All Rights Reserved.") == "")
}

test("Removes Header information") {
assert(ExtractBoilerpipeText(header + text) == boiler)
assert(ExtractBoilerpipeTextRDD(header + text) == boiler)
}
}
24 changes: 12 additions & 12 deletions src/test/scala/io/archivesunleashed/matchbox/ExtractDateTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.archivesunleashed.matchbox

import io.archivesunleashed.matchbox.ExtractDate.DateComponent.{DD, MM, YYYY, YYYYMM, YYYYMMDD}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.{DD, MM, YYYY, YYYYMM, YYYYMMDD}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
Expand All @@ -26,13 +26,13 @@ class ExtractDateTest extends FunSuite {

test("simple") {
val date = "20151204"
assert(ExtractDate(date, YYYY) == "2015")
assert(ExtractDate(date, MM) == "12")
assert(ExtractDate(date, DD) == "04")
assert(ExtractDate(date, YYYYMM) == "201512")
assert(ExtractDate(date, YYYYMMDD) == date)
assert(ExtractDateRDD(date, YYYY) == "2015")
assert(ExtractDateRDD(date, MM) == "12")
assert(ExtractDateRDD(date, DD) == "04")
assert(ExtractDateRDD(date, YYYYMM) == "201512")
assert(ExtractDateRDD(date, YYYYMMDD) == date)
// scalastyle:off null
assert(ExtractDate(null, YYYYMMDD) == "")
assert(ExtractDateRDD(null, YYYYMMDD) == "")
// scalastyle:on null
}

Expand All @@ -42,10 +42,10 @@ class ExtractDateTest extends FunSuite {
val yearSS = 4
val monthSS = 6
val daySS = 8
assert(ExtractDate(date, YYYY) == date.substring(startSS, yearSS))
assert(ExtractDate(date, MM) == date.substring(yearSS, monthSS))
assert(ExtractDate(date, DD) == date.substring(monthSS, daySS))
assert(ExtractDate(date, YYYYMM) == date.substring(startSS, monthSS))
assert(ExtractDate(date, YYYYMMDD) == date.substring(startSS, daySS))
assert(ExtractDateRDD(date, YYYY) == date.substring(startSS, yearSS))
assert(ExtractDateRDD(date, MM) == date.substring(yearSS, monthSS))
assert(ExtractDateRDD(date, DD) == date.substring(monthSS, daySS))
assert(ExtractDateRDD(date, YYYYMM) == date.substring(startSS, monthSS))
assert(ExtractDateRDD(date, YYYYMMDD) == date.substring(startSS, daySS))
}
}

0 comments on commit 079cd24

Please sign in to comment.