From 3eac5204ba5f0be7569439ac99909e948ab95871 Mon Sep 17 00:00:00 2001 From: lintool Date: Mon, 9 Apr 2018 15:29:15 +0800 Subject: [PATCH 1/4] Initial stab at df. --- pom.xml | 5 +++++ .../io/archivesunleashed/df/package.scala | 10 ++++++++++ .../scala/io/archivesunleashed/package.scala | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+) create mode 100644 src/main/scala/io/archivesunleashed/df/package.scala diff --git a/pom.xml b/pom.xml index a67af8d0..8916f6ce 100644 --- a/pom.xml +++ b/pom.xml @@ -477,6 +477,11 @@ spark-core_2.11 ${spark.version} + + org.apache.spark + spark-sql_2.11 + ${spark.version} + org.apache.spark spark-graphx_2.11 diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala new file mode 100644 index 00000000..f61a7a87 --- /dev/null +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -0,0 +1,10 @@ +package io.archivesunleashed + +import org.apache.spark.sql.functions.udf + +/** + * Created by jimmylin on 4/9/18. + */ +package object df { + val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, "")) +} diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index 4c754815..d5ce6990 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -22,6 +22,10 @@ import ArchiveRecordWritable.ArchiveFormat import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} import io.archivesunleashed.matchbox.ExtractDate.DateComponent import io.archivesunleashed.matchbox.ExtractDate.DateComponent._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + import org.apache.hadoop.io.LongWritable import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.spark.rdd.RDD @@ -75,6 +79,20 @@ package object archivesunleashed { && !r.getUrl.endsWith("robots.txt")) } + def extractValidPagesDF(): DataFrame = { + val records = rdd.keepValidPages() + .map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, r.getContentString)) + + val schema = new StructType() + .add(StructField("CrawlDate", StringType, true)) + .add(StructField("Url", StringType, true)) + .add(StructField("MimeType", StringType, true)) + .add(StructField("Content", StringType, true)) + + val sqlContext = SparkSession.builder(); + sqlContext.getOrCreate().createDataFrame(records, schema) + } + def keepImages() = { rdd.filter(r => r.getCrawlDate != null From b332fbc254e234abb023940b6ef441b5a9b2fa3c Mon Sep 17 00:00:00 2001 From: lintool Date: Wed, 18 Apr 2018 11:56:07 -0400 Subject: [PATCH 2/4] Initial stab of what link extraction would look like with DFs. --- .../io/archivesunleashed/df/package.scala | 2 ++ .../scala/io/archivesunleashed/package.scala | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index f61a7a87..1380b7c3 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -7,4 +7,6 @@ import org.apache.spark.sql.functions.udf */ package object df { val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, "")) + + val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", "")) } diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index d5ce6990..5b59c623 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -19,7 +19,7 @@ package io import io.archivesunleashed.data.{ArchiveRecordWritable, ArchiveRecordInputFormat} import ArchiveRecordWritable.ArchiveFormat -import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} +import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractDomain, RemoveHTML} import io.archivesunleashed.matchbox.ExtractDate.DateComponent import io.archivesunleashed.matchbox.ExtractDate.DateComponent._ @@ -93,6 +93,22 @@ package object archivesunleashed { sqlContext.getOrCreate().createDataFrame(records, schema) } + def extractHyperlinksDF(): DataFrame = { + val records = rdd.keepValidPages() + .keepValidPages() + .flatMap(r => ExtractLinks(r.getUrl, r.getContentString).map(t => (r.getCrawlDate, t._1, t._2, t._3))) + .map(t => Row(t._1, t._2, t._3, t._4)) + + val schema = new StructType() + .add(StructField("CrawlDate", StringType, true)) + .add(StructField("Src", StringType, true)) + .add(StructField("Dest", StringType, true)) + .add(StructField("Anchor", StringType, true)) + + val sqlContext = SparkSession.builder(); + sqlContext.getOrCreate().createDataFrame(records, schema) + } + def keepImages() = { rdd.filter(r => r.getCrawlDate != null From 1bb77e084c1beeb9b62d28d032f424aaf1b14954 Mon Sep 17 00:00:00 2001 From: lintool Date: Fri, 27 Apr 2018 10:17:45 -0400 Subject: [PATCH 3/4] Added test case. --- .../scala/io/archivesunleashed/package.scala | 2 +- .../archivesunleashed/df/SimpleDfTest.scala | 78 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index b9d97450..074bda2b 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -100,7 +100,7 @@ package object archivesunleashed { .add(StructField("MimeType", StringType, true)) .add(StructField("Content", StringType, true)) - val sqlContext = SparkSession.builder(); + val sqlContext = SparkSession.builder() sqlContext.getOrCreate().createDataFrame(records, schema) } diff --git a/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala b/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala new file mode 100644 index 00000000..5a75912a --- /dev/null +++ b/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala @@ -0,0 +1,78 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +import io.archivesunleashed.df._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class SimpleDfTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-df" + private var sc: SparkContext = _ + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + sc = new SparkContext(conf) + } + + test("count records") { + val df = RecordLoader.loadArchives(arcPath, sc) + .extractValidPagesDF() + + // We need this in order to use the $-notation + val spark = SparkSession.builder().master("local").getOrCreate() + import spark.implicits._ + + val results = df.select(ExtractDomain($"Url").as("Domain")) + .groupBy("Domain").count().orderBy(desc("count")).head(3) + + // Results should be: + // +------------------+-----+ + // | Domain|count| + // +------------------+-----+ + // | www.archive.org| 132| + // | deadlists.com| 2| + // |www.hideout.com.br| 1| + // +------------------+-----+ + + assert(results(0).get(0) == "www.archive.org") + assert(results(0).get(1) == 132) + + assert(results(1).get(0) == "deadlists.com") + assert(results(1).get(1) == 2) + + assert(results(2).get(0) == "www.hideout.com.br") + assert(results(2).get(1) == 1) + } + + after { + if (sc != null) { + sc.stop() + } + } +} From edc830c62ecf4c8cc275af26ed9dbac677ecd07c Mon Sep 17 00:00:00 2001 From: lintool Date: Fri, 27 Apr 2018 10:21:24 -0400 Subject: [PATCH 4/4] Docs. --- src/main/scala/io/archivesunleashed/df/package.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index 1380b7c3..330c18f9 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -3,9 +3,13 @@ package io.archivesunleashed import org.apache.spark.sql.functions.udf /** - * Created by jimmylin on 4/9/18. + * UDFs for data frames. */ package object df { + // TODO: UDFs for use with data frames go here, tentatively. There are couple of ways we could build UDFs, + // by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll + // need to populate more UDFs over time, but this is a start. + val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, "")) val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", ""))