diff --git a/pom.xml b/pom.xml index 79a07d4b..c4d09052 100644 --- a/pom.xml +++ b/pom.xml @@ -501,6 +501,11 @@ spark-core_2.11 ${spark.version} + + org.apache.spark + spark-sql_2.11 + ${spark.version} + org.apache.spark spark-graphx_2.11 diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala new file mode 100644 index 00000000..330c18f9 --- /dev/null +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -0,0 +1,16 @@ +package io.archivesunleashed + +import org.apache.spark.sql.functions.udf + +/** + * UDFs for data frames. + */ +package object df { + // TODO: UDFs for use with data frames go here, tentatively. There are couple of ways we could build UDFs, + // by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll + // need to populate more UDFs over time, but this is a start. + + val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, "")) + + val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", "")) +} diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index 342af4b4..074bda2b 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -19,9 +19,13 @@ package io import io.archivesunleashed.data.{ArchiveRecordWritable, ArchiveRecordInputFormat} import ArchiveRecordWritable.ArchiveFormat -import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} +import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractDomain, RemoveHTML} import io.archivesunleashed.matchbox.ExtractDate.DateComponent import io.archivesunleashed.matchbox.ExtractDate.DateComponent._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + import org.apache.hadoop.io.LongWritable import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.spark.rdd.RDD @@ -86,6 +90,36 @@ package object archivesunleashed { && !r.getUrl.endsWith("robots.txt")) } + def extractValidPagesDF(): DataFrame = { + val records = rdd.keepValidPages() + .map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, r.getContentString)) + + val schema = new StructType() + .add(StructField("CrawlDate", StringType, true)) + .add(StructField("Url", StringType, true)) + .add(StructField("MimeType", StringType, true)) + .add(StructField("Content", StringType, true)) + + val sqlContext = SparkSession.builder() + sqlContext.getOrCreate().createDataFrame(records, schema) + } + + def extractHyperlinksDF(): DataFrame = { + val records = rdd.keepValidPages() + .keepValidPages() + .flatMap(r => ExtractLinks(r.getUrl, r.getContentString).map(t => (r.getCrawlDate, t._1, t._2, t._3))) + .map(t => Row(t._1, t._2, t._3, t._4)) + + val schema = new StructType() + .add(StructField("CrawlDate", StringType, true)) + .add(StructField("Src", StringType, true)) + .add(StructField("Dest", StringType, true)) + .add(StructField("Anchor", StringType, true)) + + val sqlContext = SparkSession.builder(); + sqlContext.getOrCreate().createDataFrame(records, schema) + } + /** Removes all data except images. */ def keepImages() = { rdd.filter(r => diff --git a/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala b/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala new file mode 100644 index 00000000..5a75912a --- /dev/null +++ b/src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala @@ -0,0 +1,78 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +import io.archivesunleashed.df._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class SimpleDfTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-df" + private var sc: SparkContext = _ + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + sc = new SparkContext(conf) + } + + test("count records") { + val df = RecordLoader.loadArchives(arcPath, sc) + .extractValidPagesDF() + + // We need this in order to use the $-notation + val spark = SparkSession.builder().master("local").getOrCreate() + import spark.implicits._ + + val results = df.select(ExtractDomain($"Url").as("Domain")) + .groupBy("Domain").count().orderBy(desc("count")).head(3) + + // Results should be: + // +------------------+-----+ + // | Domain|count| + // +------------------+-----+ + // | www.archive.org| 132| + // | deadlists.com| 2| + // |www.hideout.com.br| 1| + // +------------------+-----+ + + assert(results(0).get(0) == "www.archive.org") + assert(results(0).get(1) == 132) + + assert(results(1).get(0) == "deadlists.com") + assert(results(1).get(1) == 2) + + assert(results(2).get(0) == "www.hideout.com.br") + assert(results(2).get(1) == 1) + } + + after { + if (sc != null) { + sc.stop() + } + } +}