Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe matchbox Implementations #387

Merged
merged 12 commits into from
Dec 5, 2019
18 changes: 9 additions & 9 deletions src/main/scala/io/archivesunleashed/ArchiveRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream
import java.security.MessageDigest

import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils, ArchiveRecordWritable}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDate, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDateRDD, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import org.apache.spark.SerializableWritable
import org.archive.io.arc.ARCRecord
import org.archive.io.warc.WARCRecord
Expand Down Expand Up @@ -84,25 +84,25 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends

val getCrawlDate: String = {
if (recordFormat == ArchiveRecordWritable.ArchiveFormat.ARC){
ExtractDate(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDate.DateComponent.YYYYMMDD)
ExtractDateRDD(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDateRDD.DateComponent.YYYYMMDD)
} else {
ExtractDate(
ExtractDateRDD(
ArchiveUtils.get14DigitDate(
ISO8601.parse(r.t.getRecord.asInstanceOf[WARCRecord].getHeader.getDate)),
ExtractDate.DateComponent.YYYYMMDD)
ExtractDateRDD.DateComponent.YYYYMMDD)
}
}

val getCrawlMonth: String = {
if (recordFormat == ArchiveRecordWritable.ArchiveFormat.ARC) {
ExtractDate(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDate.DateComponent.YYYYMM)
ExtractDateRDD(r.t.getRecord.asInstanceOf[ARCRecord].getMetaData.getDate,
ExtractDateRDD.DateComponent.YYYYMM)
} else {
ExtractDate(
ExtractDateRDD(
ArchiveUtils.get14DigitDate(
ISO8601.parse(r.t.getRecord.asInstanceOf[WARCRecord].getHeader.getDate)),
ExtractDate.DateComponent.YYYYMM)
ExtractDateRDD.DateComponent.YYYYMM)
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed

import org.apache.commons.io.IOUtils
import io.archivesunleashed.matchbox.ComputeMD5RDD
import io.archivesunleashed.matchbox.{ComputeMD5RDD,ExtractDateRDD}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import java.io.ByteArrayInputStream
Expand All @@ -26,6 +26,7 @@ import java.util.Base64
/**
* UDFs for data frames.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this blank line.

package object df {
// UDFs for use with data frames go here, tentatively. There are couple of ways we could build UDFs,
// by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll
Expand All @@ -51,6 +52,12 @@ package object df {

val ComputeImageSizeDF = udf(io.archivesunleashed.matchbox.ComputeImageSize.apply(_: Array[Byte]))

val DetectLanguageDF = udf(io.archivesunleashed.matchbox.DetectLanguageRDD.apply(_: String))

val ExtractBoilerpipeTextDF = udf(io.archivesunleashed.matchbox.ExtractBoilerpipeTextRDD.apply(_: String))

val ExtractDateDF = udf((io.archivesunleashed.matchbox.ExtractDateRDD.apply(_: String, _: String)))

/**
* Given a dataframe, serializes binary object and saves to disk
* @param df the input dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.tika.language.detect.LanguageDetector;
import org.apache.tika.language.detect.LanguageResult;

/** Detects language using Apache Tika. */
object DetectLanguage {
object DetectLanguageRDD {

/** Detects the language of a String input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import de.l3s.boilerpipe.extractors.DefaultExtractor
import java.io.IOException

/** Extract raw text content from an HTML page, minus "boilerplate" content (using boilerpipe). */
object ExtractBoilerpipeText {
object ExtractBoilerpipeTextRDD {
/** Uses boilerpipe to extract raw text content from a page.
*
* ExtractBoilerpipeText removes boilerplate text (e.g. a copyright statement) from an HTML string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed.matchbox

/** Gets different parts of a dateString. */
object ExtractDate {
object ExtractDateRDD {
object DateComponent extends Enumeration {
/** An enum specifying years, months, days or a combination. */
type DateComponent = Value
Expand Down Expand Up @@ -49,4 +49,29 @@ object ExtractDate {
""
}
}

/** Extracts the wanted date component from a date (for DataFrames).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's reword this to:

Extracts a provided date component from a date (for DataFrames).

*
* @param fullDate date returned by `WARecord.getCrawlDate`, formatted as YYYYMMDD
* @param dateFormat in String format
*/
def apply(fullDate: String, dateFormat: String): String = {
val startSS = 0
val yearSS = 4
val monthSS = 6
val daySS = 8
val maybeFullDate: Option[String] = Option(fullDate)
maybeFullDate match {
case Some(fulldate) =>
dateFormat match {
case "YYYY" => fullDate.substring(startSS, yearSS)
case "MM" => fullDate.substring(yearSS, monthSS)
case "DD" => fullDate.substring(monthSS, daySS)
case "YYYYMM" => fullDate.substring(startSS, monthSS)
case _ => fullDate.substring(startSS, daySS)
}
case None =>
""
}
}
}
12 changes: 6 additions & 6 deletions src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.util.Base64

import io.archivesunleashed.data.{ArchiveRecordInputFormat, ArchiveRecordWritable}
import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{DetectLanguage, DetectMimeTypeTika, ExtractDate,
import io.archivesunleashed.matchbox.{DetectLanguageRDD, DetectMimeTypeTika, ExtractDateRDD,
ExtractDomainRDD, ExtractImageDetails, ExtractImageLinksRDD,
ExtractLinksRDD, GetExtensionMimeRDD, RemoveHTMLRDD}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -542,7 +542,7 @@ package object archivesunleashed {
* @param component the selected DateComponent enum value
*/
def keepDate(dates: List[String], component: DateComponent = DateComponent.YYYYMMDD): RDD[ArchiveRecord] = {
rdd.filter(r => dates.contains(ExtractDate(r.getCrawlDate, component)))
rdd.filter(r => dates.contains(ExtractDateRDD(r.getCrawlDate, component)))
}

/** Removes all data but selected exact URLs.
Expand Down Expand Up @@ -579,7 +579,7 @@ package object archivesunleashed {
* @param lang a set of ISO 639-2 codes
*/
def keepLanguages(lang: Set[String]): RDD[ArchiveRecord] = {
rdd.filter(r => lang.contains(DetectLanguage(RemoveHTMLRDD(r.getContentString))))
rdd.filter(r => lang.contains(DetectLanguageRDD(RemoveHTMLRDD(r.getContentString))))
}

/** Removes all content that does not pass Regular Expression test.
Expand Down Expand Up @@ -674,7 +674,7 @@ package object archivesunleashed {
* @param lang a set of ISO 639-2 codes
*/
def discardLanguages(lang: Set[String]): RDD[ArchiveRecord] = {
rdd.filter(r => !lang.contains(DetectLanguage(RemoveHTMLRDD(r.getContentString))))
rdd.filter(r => !lang.contains(DetectLanguageRDD(RemoveHTMLRDD(r.getContentString))))
}
}
}
6 changes: 3 additions & 3 deletions src/test/scala/io/archivesunleashed/ArcTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.archivesunleashed

import com.google.common.io.Resources
import io.archivesunleashed.matchbox.{DetectLanguage, DetectMimeTypeTika, ExtractLinksRDD, RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.{DetectLanguageRDD, DetectMimeTypeTika, ExtractLinksRDD, RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -82,7 +82,7 @@ class ArcTest extends FunSuite with BeforeAndAfter {
val languageCounts = RecordLoader.loadArchives(arcPath, sc)
.keepMimeTypes(Set("text/html"))
.map(r => RemoveHTMLRDD(r.getContentString))
.groupBy(content => DetectLanguage(content))
.groupBy(content => DetectLanguageRDD(content))
.map(f => {
(f._1, f._2.size)
})
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/io/archivesunleashed/RecordRDDTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.archivesunleashed

import com.google.common.io.Resources
import io.archivesunleashed.matchbox.ExtractDate
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.ExtractDateRDD
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -62,7 +62,7 @@ class RecordRDDTest extends FunSuite with BeforeAndAfter {
val base = RecordLoader.loadArchives(arcPath, sc)
val component = DateComponent.YYYY
val r = base
.filter (x => ExtractDate(x.getCrawlDate, component) == testDate)
.filter (x => ExtractDateRDD(x.getCrawlDate, component) == testDate)
.map ( mp => mp.getUrl).take(3)
val r2 = base.keepDate(List(testDate), component)
.map ( mp => mp.getUrl).take(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class ExtractBoilerPipeTextTest extends FunSuite {
var boiler = """Copyright 2017"""

test("Collects boilerpipe") {
assert(ExtractBoilerpipeText(text) == boiler)
assert(ExtractBoilerpipeTextRDD(text) == boiler)
// scalastyle:off null
assert(ExtractBoilerpipeText(null) == "")
assert(ExtractBoilerpipeTextRDD(null) == "")
// scalastyle:on null
assert(ExtractBoilerpipeText("All Rights Reserved.") == "")
assert(ExtractBoilerpipeTextRDD("All Rights Reserved.") == "")
}

test("Removes Header information") {
assert(ExtractBoilerpipeText(header + text) == boiler)
assert(ExtractBoilerpipeTextRDD(header + text) == boiler)
}
}
24 changes: 12 additions & 12 deletions src/test/scala/io/archivesunleashed/matchbox/ExtractDateTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.archivesunleashed.matchbox

import io.archivesunleashed.matchbox.ExtractDate.DateComponent.{DD, MM, YYYY, YYYYMM, YYYYMMDD}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.{DD, MM, YYYY, YYYYMM, YYYYMMDD}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
Expand All @@ -26,13 +26,13 @@ class ExtractDateTest extends FunSuite {

test("simple") {
val date = "20151204"
assert(ExtractDate(date, YYYY) == "2015")
assert(ExtractDate(date, MM) == "12")
assert(ExtractDate(date, DD) == "04")
assert(ExtractDate(date, YYYYMM) == "201512")
assert(ExtractDate(date, YYYYMMDD) == date)
assert(ExtractDateRDD(date, YYYY) == "2015")
assert(ExtractDateRDD(date, MM) == "12")
assert(ExtractDateRDD(date, DD) == "04")
assert(ExtractDateRDD(date, YYYYMM) == "201512")
assert(ExtractDateRDD(date, YYYYMMDD) == date)
// scalastyle:off null
assert(ExtractDate(null, YYYYMMDD) == "")
assert(ExtractDateRDD(null, YYYYMMDD) == "")
// scalastyle:on null
}

Expand All @@ -42,10 +42,10 @@ class ExtractDateTest extends FunSuite {
val yearSS = 4
val monthSS = 6
val daySS = 8
assert(ExtractDate(date, YYYY) == date.substring(startSS, yearSS))
assert(ExtractDate(date, MM) == date.substring(yearSS, monthSS))
assert(ExtractDate(date, DD) == date.substring(monthSS, daySS))
assert(ExtractDate(date, YYYYMM) == date.substring(startSS, monthSS))
assert(ExtractDate(date, YYYYMMDD) == date.substring(startSS, daySS))
assert(ExtractDateRDD(date, YYYY) == date.substring(startSS, yearSS))
assert(ExtractDateRDD(date, MM) == date.substring(yearSS, monthSS))
assert(ExtractDateRDD(date, DD) == date.substring(monthSS, daySS))
assert(ExtractDateRDD(date, YYYYMM) == date.substring(startSS, monthSS))
assert(ExtractDateRDD(date, YYYYMMDD) == date.substring(startSS, daySS))
}
}