Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract popular images - Data Frame implementation #382

Merged
merged 13 commits into from
Nov 21, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import org.apache.spark.{RangePartitioner, SparkContext}
import org.apache.spark.sql.functions.{desc,first}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/** Extract most popular images from a Data Frame. */
object ExtractPopularImagesDF {
val MIN_WIDTH: Int = 30
val MIN_HEIGHT: Int = 30

/** Extracts the <i>n</i> most popular images from an Data Frame within a given size range.
*
* @param d Data frame obtained from RecordLoader
* @param limit number of most popular images in the output
* @param minWidth of image
* @param minHeight of image
* @return Dataset[Row], where the schema is (url, count)
*/
def apply(d: DataFrame, limit: Int, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): Dataset[Row] = {

val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on

val df = d.select($"url",$"md5")
.filter(($"width") >= minWidth && ($"height") >= minHeight)

val count = df.groupBy("md5").count()

df.join(count,"md5")
.groupBy("md5")
.agg(first("url").as("url"), first("count").as("count"))
.select("url","count")
.orderBy(desc("count"))
.limit(limit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.{RangePartitioner, SparkContext}

/** Extract most popular images from an RDD. */
object ExtractPopularImages {
object ExtractPopularImagesRDD {
val LIMIT_MAXIMUM: Int = 500
val LIMIT_DENOMINATOR: Int = 250
val MIN_WIDTH: Int = 30
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import com.google.common.io.Resources
import io.archivesunleashed.RecordLoader
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractPopularImagesDFTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private var sc: SparkContext = _
private val master = "local[4]"
private val appName = "example-spark"
private val testVertexFile = "temporaryTestVertexDir"
private val testEdgesFile = "temporaryTestEdgesDir"

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
conf.set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
}

test("extracts popular images") {
val highTest = 507
val exampledf = RecordLoader.loadArchives(arcPath, sc).images()
val imagesLowLimit = ExtractPopularImagesDF(exampledf, 3)
val imagesHighLimit = ExtractPopularImagesDF(exampledf, highTest)
val response = "1"
assert (imagesLowLimit.take(1)(0)(1).toString == response)
assert (imagesHighLimit.take(1)(0)(1).toString == response)
}
after {
if (sc != null) {
sc.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter {
class ExtractPopularImagesRDDTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private var sc: SparkContext = _
private val master = "local[4]"
Expand All @@ -42,8 +42,8 @@ class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter {
test("extracts popular images") {
val highTest = 507
val examplerdd = RecordLoader.loadArchives(arcPath, sc)
val imagesLowLimit = ExtractPopularImages(examplerdd, 3, sc)
val imagesHighLimit = ExtractPopularImages(examplerdd, highTest, sc)
val imagesLowLimit = ExtractPopularImagesRDD(examplerdd, 3, sc)
val imagesHighLimit = ExtractPopularImagesRDD(examplerdd, highTest, sc)
val response = Array("1\thttp://www.archive.org/images/books-small.jpg",
"1\thttp://i.creativecommons.org/l/by-sa/3.0/88x31.png",
"1\thttp://www.archive.org/images/blendbar.jpg")
Expand Down