Skip to content

Commit

Permalink
Initial DataFrames merge (#210); Partially addresses #190
Browse files Browse the repository at this point in the history
* Initial stab at df.
* Initial stab of what link extraction would look like with DFs.
* Added test case.
* Docs.
  • Loading branch information
lintool authored and ruebot committed Apr 27, 2018
1 parent 09a1ef0 commit 3de076c
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.archivesunleashed

import org.apache.spark.sql.functions.udf

/**
* UDFs for data frames.
*/
package object df {
// TODO: UDFs for use with data frames go here, tentatively. There are couple of ways we could build UDFs,
// by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll
// need to populate more UDFs over time, but this is a start.

val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, ""))

val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", ""))
}
36 changes: 35 additions & 1 deletion src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package io

import io.archivesunleashed.data.{ArchiveRecordWritable, ArchiveRecordInputFormat}
import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML}
import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractDomain, RemoveHTML}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.matchbox.ExtractDate.DateComponent._

import org.apache.spark.sql._
import org.apache.spark.sql.types._

import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -86,6 +90,36 @@ package object archivesunleashed {
&& !r.getUrl.endsWith("robots.txt"))
}

def extractValidPagesDF(): DataFrame = {
val records = rdd.keepValidPages()
.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, r.getContentString))

val schema = new StructType()
.add(StructField("CrawlDate", StringType, true))
.add(StructField("Url", StringType, true))
.add(StructField("MimeType", StringType, true))
.add(StructField("Content", StringType, true))

val sqlContext = SparkSession.builder()
sqlContext.getOrCreate().createDataFrame(records, schema)
}

def extractHyperlinksDF(): DataFrame = {
val records = rdd.keepValidPages()
.keepValidPages()
.flatMap(r => ExtractLinks(r.getUrl, r.getContentString).map(t => (r.getCrawlDate, t._1, t._2, t._3)))
.map(t => Row(t._1, t._2, t._3, t._4))

val schema = new StructType()
.add(StructField("CrawlDate", StringType, true))
.add(StructField("Src", StringType, true))
.add(StructField("Dest", StringType, true))
.add(StructField("Anchor", StringType, true))

val sqlContext = SparkSession.builder();
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/** Removes all data except images. */
def keepImages() = {
rdd.filter(r =>
Expand Down
78 changes: 78 additions & 0 deletions src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source platform for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import com.google.common.io.Resources
import io.archivesunleashed.df._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class SimpleDfTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private val master = "local[4]"
private val appName = "example-df"
private var sc: SparkContext = _

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
sc = new SparkContext(conf)
}

test("count records") {
val df = RecordLoader.loadArchives(arcPath, sc)
.extractValidPagesDF()

// We need this in order to use the $-notation
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._

val results = df.select(ExtractDomain($"Url").as("Domain"))
.groupBy("Domain").count().orderBy(desc("count")).head(3)

// Results should be:
// +------------------+-----+
// | Domain|count|
// +------------------+-----+
// | www.archive.org| 132|
// | deadlists.com| 2|
// |www.hideout.com.br| 1|
// +------------------+-----+

assert(results(0).get(0) == "www.archive.org")
assert(results(0).get(1) == 132)

assert(results(1).get(0) == "deadlists.com")
assert(results(1).get(1) == 2)

assert(results(2).get(0) == "www.hideout.com.br")
assert(results(2).get(1) == 1)
}

after {
if (sc != null) {
sc.stop()
}
}
}

0 comments on commit 3de076c

Please sign in to comment.