From 68922a2b2fbd53ca25fb20a43861bdac70eba041 Mon Sep 17 00:00:00 2001 From: g285sing Date: Wed, 6 Nov 2019 19:15:53 -0500 Subject: [PATCH 1/6] Issue-368 --- src/main/scala/io/archivesunleashed/ArchiveRecord.scala | 6 +++--- src/main/scala/io/archivesunleashed/df/package.scala | 2 +- .../matchbox/ExtractBoilerpipeText.scala | 2 +- .../scala/io/archivesunleashed/matchbox/RemoveHTML.scala | 2 +- .../{RemoveHttpHeader.scala => RemoveHTTPHeader.scala} | 2 +- src/test/scala/io/archivesunleashed/ArcTest.scala | 2 +- ...oveHttpHeaderTest.scala => RemoveHTTPHeaderTest.scala} | 8 ++++---- 7 files changed, 12 insertions(+), 12 deletions(-) rename src/main/scala/io/archivesunleashed/matchbox/{RemoveHttpHeader.scala => RemoveHTTPHeader.scala} (97%) rename src/test/scala/io/archivesunleashed/matchbox/{RemoveHttpHeaderTest.scala => RemoveHTTPHeaderTest.scala} (85%) diff --git a/src/main/scala/io/archivesunleashed/ArchiveRecord.scala b/src/main/scala/io/archivesunleashed/ArchiveRecord.scala index b3cdee0c..1d240fbb 100644 --- a/src/main/scala/io/archivesunleashed/ArchiveRecord.scala +++ b/src/main/scala/io/archivesunleashed/ArchiveRecord.scala @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream import java.security.MessageDigest import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils, ArchiveRecordWritable} -import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDate, ExtractDomain, RemoveHttpHeader} +import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDate, ExtractDomain, RemoveHTTPHeader} import org.apache.spark.SerializableWritable import org.archive.io.arc.ARCRecord import org.archive.io.warc.WARCRecord @@ -155,8 +155,8 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends val getBinaryBytes: Array[Byte] = { if (getContentString.startsWith("HTTP/")) { getContentBytes.slice( - getContentString.indexOf(RemoveHttpHeader.headerEnd) - + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + getContentString.indexOf(RemoveHTTPHeader.headerEnd) + + RemoveHTTPHeader.headerEnd.length, getContentBytes.length) } else { getContentBytes } diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index f6afc320..25889048 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -33,7 +33,7 @@ package object df { val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, "")) - val RemoveHttpHeader = udf(io.archivesunleashed.matchbox.RemoveHttpHeader.apply(_: String)) + val RemoveHTTPHeader = udf(io.archivesunleashed.matchbox.RemoveHTTPHeader.apply(_: String)) val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", "")) diff --git a/src/main/scala/io/archivesunleashed/matchbox/ExtractBoilerpipeText.scala b/src/main/scala/io/archivesunleashed/matchbox/ExtractBoilerpipeText.scala index d7236ddd..2924a102 100644 --- a/src/main/scala/io/archivesunleashed/matchbox/ExtractBoilerpipeText.scala +++ b/src/main/scala/io/archivesunleashed/matchbox/ExtractBoilerpipeText.scala @@ -29,7 +29,7 @@ object ExtractBoilerpipeText { */ def apply(input: String): String = { - removeBoilerplate(RemoveHttpHeader(input)) + removeBoilerplate(RemoveHTTPHeader(input)) } private def removeBoilerplate(input: String): String = { diff --git a/src/main/scala/io/archivesunleashed/matchbox/RemoveHTML.scala b/src/main/scala/io/archivesunleashed/matchbox/RemoveHTML.scala index d20892a6..3903e17d 100644 --- a/src/main/scala/io/archivesunleashed/matchbox/RemoveHTML.scala +++ b/src/main/scala/io/archivesunleashed/matchbox/RemoveHTML.scala @@ -28,7 +28,7 @@ object RemoveHTML { */ def apply(content: String): String = { // First remove the HTTP header. - val maybeContent: Option[String] = Option(RemoveHttpHeader(content)) + val maybeContent: Option[String] = Option(RemoveHTTPHeader(content)) maybeContent match { case Some(content) => Jsoup.parse(content).text().replaceAll("[\\r\\n]+", " ") diff --git a/src/main/scala/io/archivesunleashed/matchbox/RemoveHttpHeader.scala b/src/main/scala/io/archivesunleashed/matchbox/RemoveHTTPHeader.scala similarity index 97% rename from src/main/scala/io/archivesunleashed/matchbox/RemoveHttpHeader.scala rename to src/main/scala/io/archivesunleashed/matchbox/RemoveHTTPHeader.scala index 43e30e06..e074698d 100644 --- a/src/main/scala/io/archivesunleashed/matchbox/RemoveHttpHeader.scala +++ b/src/main/scala/io/archivesunleashed/matchbox/RemoveHTTPHeader.scala @@ -16,7 +16,7 @@ package io.archivesunleashed.matchbox /** Remove HTTP headers. */ -object RemoveHttpHeader { +object RemoveHTTPHeader { val headerEnd = "\r\n\r\n" /** Remove HTTP headers. diff --git a/src/test/scala/io/archivesunleashed/ArcTest.scala b/src/test/scala/io/archivesunleashed/ArcTest.scala index 3892874e..433df8de 100644 --- a/src/test/scala/io/archivesunleashed/ArcTest.scala +++ b/src/test/scala/io/archivesunleashed/ArcTest.scala @@ -103,7 +103,7 @@ class ArcTest extends FunSuite with BeforeAndAfter { test("detect mime type tika") { val mimeTypeCounts = RecordLoader.loadArchives(arcPath, sc) - .map(r => RemoveHttpHeader(r.getContentString)) + .map(r => RemoveHTTPHeader(r.getContentString)) .groupBy(content => DetectMimeTypeTika(content.getBytes)) .map(f => { (f._1, f._2.size) diff --git a/src/test/scala/io/archivesunleashed/matchbox/RemoveHttpHeaderTest.scala b/src/test/scala/io/archivesunleashed/matchbox/RemoveHTTPHeaderTest.scala similarity index 85% rename from src/test/scala/io/archivesunleashed/matchbox/RemoveHttpHeaderTest.scala rename to src/test/scala/io/archivesunleashed/matchbox/RemoveHTTPHeaderTest.scala index 443979c1..e128816a 100644 --- a/src/test/scala/io/archivesunleashed/matchbox/RemoveHttpHeaderTest.scala +++ b/src/test/scala/io/archivesunleashed/matchbox/RemoveHTTPHeaderTest.scala @@ -21,14 +21,14 @@ import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class RemoveHttpHeaderTest extends FunSuite { +class RemoveHTTPHeaderTest extends FunSuite { test("simple") { val header = "HTTP/1.1 200 OK\r\n\r\nHello content" val nohttp = "This has no Http" - val removed = RemoveHttpHeader(header) - val unchanged = RemoveHttpHeader(nohttp) + val removed = RemoveHTTPHeader(header) + val unchanged = RemoveHTTPHeader(nohttp) // scalastyle:off null - val error = RemoveHttpHeader(null) + val error = RemoveHTTPHeader(null) // scalastyle:on null assert(removed == "Hello content") assert(unchanged == nohttp) From 0db00939c510b4dd16f3057ce4e1c0b2451522db Mon Sep 17 00:00:00 2001 From: g285sing Date: Mon, 11 Nov 2019 21:45:32 -0500 Subject: [PATCH 2/6] Issue238 --- src/main/scala/io/archivesunleashed/df/package.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index 25889048..d5671544 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -16,7 +16,7 @@ package io.archivesunleashed import org.apache.commons.io.IOUtils -import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDomain, RemoveHTML} +import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDomain, RemoveHTML, ExtractLinks} import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame import java.io.ByteArrayInputStream @@ -39,6 +39,8 @@ package object df { var RemoveHTML = udf(io.archivesunleashed.matchbox.RemoveHTML.apply(_:String)) + val ExtractLinks = udf(io.archivesunleashed.matchbox.ExtractLinks.apply(_:String,_:String)) + /** * Given a dataframe, serializes binary object and saves to disk * @param df the input dataframe From 5817bf507d3d9ff995aae3643bd6989fa35d3bad Mon Sep 17 00:00:00 2001 From: g285sing Date: Mon, 11 Nov 2019 21:45:32 -0500 Subject: [PATCH 3/6] Issue238 --- src/main/scala/io/archivesunleashed/df/package.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index 25889048..d5671544 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -16,7 +16,7 @@ package io.archivesunleashed import org.apache.commons.io.IOUtils -import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDomain, RemoveHTML} +import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDomain, RemoveHTML, ExtractLinks} import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame import java.io.ByteArrayInputStream @@ -39,6 +39,8 @@ package object df { var RemoveHTML = udf(io.archivesunleashed.matchbox.RemoveHTML.apply(_:String)) + val ExtractLinks = udf(io.archivesunleashed.matchbox.ExtractLinks.apply(_:String,_:String)) + /** * Given a dataframe, serializes binary object and saves to disk * @param df the input dataframe From 4e5a066a10bfd430ba32bdc596c1f24896b370be Mon Sep 17 00:00:00 2001 From: g285sing Date: Tue, 12 Nov 2019 12:59:28 -0500 Subject: [PATCH 4/6] test --- .../df/ExtarctHyperlinksTest.scala | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 src/test/scala/io/archivesunleashed/df/ExtarctHyperlinksTest.scala diff --git a/src/test/scala/io/archivesunleashed/df/ExtarctHyperlinksTest.scala b/src/test/scala/io/archivesunleashed/df/ExtarctHyperlinksTest.scala new file mode 100644 index 00000000..05a12419 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/df/ExtarctHyperlinksTest.scala @@ -0,0 +1,96 @@ +/* + * Copyright © 2017 The Archives Unleashed Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +// scalastyle:off underscore.import +import io.archivesunleashed.df._ +import org.apache.spark.sql.functions._ +// scalastyle:on underscore.import +import org.apache.spark.sql.SparkSession +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class ExtractHyperlinksTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-df" + private var sc: SparkContext = _ + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + sc = new SparkContext(conf) + } + + test("count records") { + val df = RecordLoader.loadArchives(arcPath, sc) + .extractValidPagesDF() + + val dest = udf((vs: Seq[Any]) => vs(0).toString.split(",")(1)) + + // We need this in order to use the $-notation + val spark = SparkSession.builder().master("local").getOrCreate() + // scalastyle:off + import spark.implicits._ + // scalastyle:on + + val interResults = df.select( RemovePrefixWWW( ExtractDomain($"url")).as("Domain"), + $"url".as("url"), + $"crawl_date", + explode_outer(ExtractLinks($"url",$"content")).as("link") + ) + .filter(lower($"content").contains("keynote")) //filtered on keyword internet + + val results = interResults.select($"url",$"Domain",$"crawl_date",dest(array($"link")).as("destination_page")).head(3) + + // Results should be: + // +--------------------------------+-----------+----------+----------------------------------------------------+ + // |url |Domain |crawl_date|destination_page | + // +--------------------------------+-----------+----------+----------------------------------------------------+ + // |http://www.archive.org/index.php|archive.org|20080430 |http://www.archive.org/create/ | + // |http://www.archive.org/index.php|archive.org|20080430 |http://web.archive.org/collections/web/advanced.html| + // |http://www.archive.org/index.php|archive.org|20080430 |http://www.sloan.org | + // +--------------------------------+-----------+----------+----------------------------------------------------+ + + + assert(results(0).get(0) == "http://www.archive.org/index.php") + assert(results(0).get(1) == "archive.org") + assert(results(0).get(2) == "20080430") + assert(results(0).get(3) == "http://www.archive.org/create/") + + assert(results(1).get(0) == "http://www.archive.org/index.php") + assert(results(1).get(1) == "archive.org") + assert(results(1).get(2) == "20080430") + assert(results(1).get(3) == "http://web.archive.org/collections/web/advanced.html") + + assert(results(2).get(0) == "http://www.archive.org/index.php") + assert(results(2).get(1) == "archive.org") + assert(results(2).get(2) == "20080430") + assert(results(2).get(3) == "http://www.sloan.org") + } + + after { + if (sc != null) { + sc.stop() + } + } +} From 8d204d09a25b1a43256b685d37c71cb6782a9635 Mon Sep 17 00:00:00 2001 From: g285sing Date: Mon, 16 Dec 2019 14:22:32 -0500 Subject: [PATCH 5/6] serializable-api --- .../scala/io/archivesunleashed/package.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index a9f064ec..f5451ec8 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -31,10 +31,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent import java.net.URI import java.net.URL -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType} import org.apache.hadoop.io.LongWritable -import org.apache.spark.{SerializableWritable, SparkContext} +import org.apache.spark.{RangePartitioner, SerializableWritable, SparkContext} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag import scala.util.matching.Regex @@ -83,6 +83,32 @@ package object archivesunleashed { } } + /** + * A Wrapper class around DF to allow Dfs of type ARCRecord and WARCRecord to be queried via a fluent API. + * + * To load such an DF, please use [[RecordLoader]] and apply .all() on it. + */ + implicit class WARecordDF(df: DataFrame) extends java.io.Serializable { + + def keepValidPagesDF(): DataFrame = { + + val spark = SparkSession.builder().master("local").getOrCreate() + // scalastyle:off + import spark.implicits._ + // scalastyle:on + + df.filter($"crawl_date" isNotNull) + .filter(!($"url".rlike(".*robots\\.txt$")) && + ( $"mime_type_web_server".rlike("text/html") || + $"mime_type_web_server".rlike("application/xhtml+xml") || + $"url".rlike("(?i).*htm$") || + $"url".rlike("(?i).*html$") + ) + ) + .filter($"HttpStatus" === 200) + } + } + /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * @@ -94,7 +120,7 @@ package object archivesunleashed { Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */ def all(): DataFrame = { val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, - DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes)) + DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes, r.getHttpStatus)) val schema = new StructType() .add(StructField("crawl_date", StringType, true)) @@ -103,6 +129,7 @@ package object archivesunleashed { .add(StructField("mime_type_tika", StringType, true)) .add(StructField("content", StringType, true)) .add(StructField("bytes", BinaryType, true)) + .add(StructField("HttpStatus", StringType, true)) val sqlContext = SparkSession.builder() sqlContext.getOrCreate().createDataFrame(records, schema) From 87d2b056bba797bd344b3d103e980a4552f12a2c Mon Sep 17 00:00:00 2001 From: g285sing Date: Mon, 16 Dec 2019 16:27:18 -0500 Subject: [PATCH 6/6] tests --- .../io/archivesunleashed/RecordDFTest.scala | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/test/scala/io/archivesunleashed/RecordDFTest.scala diff --git a/src/test/scala/io/archivesunleashed/RecordDFTest.scala b/src/test/scala/io/archivesunleashed/RecordDFTest.scala new file mode 100644 index 00000000..0232c538 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/RecordDFTest.scala @@ -0,0 +1,56 @@ +/* + * Copyright © 2017 The Archives Unleashed Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class RecordDFTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val badPath = Resources.getResource("arc/badexample.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-spark" + private var sc: SparkContext = _ + private val archive = "http://www.archive.org/" + private val sloan = "http://www.sloan.org" + private val regex = raw"Please visit our website at".r + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + conf.set("spark.driver.allowMultipleContexts", "true"); + sc = new SparkContext(conf) + } + + test("keep Valid Pages") { + val expected = "http://www.archive.org/" + val base = RecordLoader.loadArchives(arcPath, sc).all() + .keepValidPagesDF().take(1)(0)(1) + assert (base.toString == expected) + } + + after { + if (sc != null) { + sc.stop() + } + } +} \ No newline at end of file