Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to save to Parquet for app. #454

Merged
merged 6 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ spark-submit --class io.archivesunleashed.app.CommandLinAppRunner PATH_TO_AUT_JA

Additional flags include:

* `--output-format FORMAT` (Used only for the `DomainGraphExtractor`, and the
options are `TEXT` (default) or `GEXF`.)
* `--output-format FORMAT` (`csv` (default) or `parquet`. `DomainGraphExtractor`
has two additional output options `graphml` or `gexf`.)
* `--split` (The extractor will put results for each input file in its own
directory. Each directory name will be the name of the ARC/WARC file parsed.)
* `--partition N` (The extractor will partition RDD or DataFrame according to N
Expand Down
106 changes: 90 additions & 16 deletions src/main/scala/io/archivesunleashed/app/CommandLineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,23 @@ class CommandLineApp(conf: CmdAppConf) {
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).audio())
}
save(AudioInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(AudioInformationExtractor(df))
} else {
saveCsv(AudioInformationExtractor(df))
}
}),
"DomainFrequencyExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webpages()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).webpages())
}
save(DomainFrequencyExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(DomainFrequencyExtractor(df))
} else {
saveCsv(DomainFrequencyExtractor(df))
}
}),
"DomainGraphExtractor" ->
((inputFiles: List[String]) => {
Expand All @@ -136,11 +144,13 @@ class CommandLineApp(conf: CmdAppConf) {
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "gexf") {
new File(saveTarget).mkdirs()
WriteGEXF(DomainGraphExtractor(df).collect(), Paths.get(saveTarget).toString + "/GEXF.gexf")
} else if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(DomainGraphExtractor(df))
} else if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "graphml") {
new File(saveTarget).mkdirs()
WriteGraphML(DomainGraphExtractor(df).collect(), Paths.get(saveTarget).toString + "/GRAPHML.graphml")
} else {
save(DomainGraphExtractor(df))
saveCsv(DomainGraphExtractor(df))
}
}),
"ImageInformationExtractor" ->
Expand All @@ -149,101 +159,146 @@ class CommandLineApp(conf: CmdAppConf) {
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).images())
}
save(ImageInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(ImageInformationExtractor(df))
} else {
saveCsv(ImageInformationExtractor(df))
}
}),
"ImageGraphExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).imagegraph()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).imagegraph())
}
save(ImageGraphExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(ImageGraphExtractor(df))
} else {
saveCsv(ImageGraphExtractor(df))
}
}),
"PDFInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).pdfs()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).pdfs())
}
save(PDFInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(PDFInformationExtractor(df))
} else {
saveCsv(PDFInformationExtractor(df))
}
}),
"PlainTextExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webpages()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).webpages())
}
save(PlainTextExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(PlainTextExtractor(df))
} else {
saveCsv(PlainTextExtractor(df))
}
}),
"PresentationProgramInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).presentationProgramFiles()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).presentationProgramFiles())
}
save(PresentationProgramInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(PresentationProgramInformationExtractor(df))
} else {
saveCsv(PresentationProgramInformationExtractor(df))
}
}),
"SpreadsheetInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).spreadsheets()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).spreadsheets())
}
save(SpreadsheetInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(SpreadsheetInformationExtractor(df))
} else {
saveCsv(SpreadsheetInformationExtractor(df))
}
}),
"TextFilesInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).textFiles()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).textFiles())
}
save(TextFilesInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(TextFilesInformationExtractor(df))
} else {
saveCsv(TextFilesInformationExtractor(df))
}
}),
"VideoInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).videos()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).videos())
}
save(VideoInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(VideoInformationExtractor(df))
} else {
saveCsv(VideoInformationExtractor(df))
}
}),
"WebGraphExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webgraph()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).webgraph())
}
save(WebGraphExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(WebGraphExtractor(df))
} else {
saveCsv(WebGraphExtractor(df))
}
}),
"WebPagesExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webpages()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).webpages())
}
save(WebPagesExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(WebPagesExtractor(df))
} else {
saveCsv(WebPagesExtractor(df))
}
}),
"WordProcessorInformationExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).wordProcessorFiles()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).wordProcessorFiles())
}
save(WordProcessorInformationExtractor(df))
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "parquet") {
saveParquet(WordProcessorInformationExtractor(df))
} else {
saveCsv(WordProcessorInformationExtractor(df))
}
})
)

/** Generic routine for saving Dataset obtained from querying DataFrames to file.
/** Routine for saving Dataset obtained from querying DataFrames to CSV.
* Files may be merged according to options specified in 'partition' setting.
*
* @param d generic dataset obtained from querying DataFrame
* @return Unit
*/

def save(d: Dataset[Row]): Unit = {
def saveCsv(d: Dataset[Row]): Unit = {
if (!configuration.partition.isEmpty) {
d.coalesce(configuration.partition()).write
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("header", "true")
.csv(saveTarget)
} else {
d.write
Expand All @@ -252,6 +307,25 @@ class CommandLineApp(conf: CmdAppConf) {
}
}

/** Routine for saving Dataset obtained from querying DataFrames to Parquet.
* Files may be merged according to options specified in 'partition' setting.
*
* @param d generic dataset obtained from querying DataFrame
* @return Unit
*/

def saveParquet(d: Dataset[Row]): Unit = {
if (!configuration.partition.isEmpty) {
d.coalesce(configuration.partition()).write
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.parquet(saveTarget)
} else {
d.write
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.parquet(saveTarget)
}
}

/** Verify the validity of command line arguments regarding input and output files.
*
* All input files need to exist, and ouput files should not exist, for this to pass.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.{ExtractDomainDF, RemoveHTMLDF,
import io.archivesunleashed.df.{ExtractBoilerpipeTextDF, RemoveHTMLDF,
RemoveHTTPHeaderDF}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

Expand All @@ -32,7 +32,6 @@ object PlainTextExtractor {
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.select($"crawl_date", ExtractDomainDF($"url").as("domain"),
$"url", RemoveHTMLDF(RemoveHTTPHeaderDF($"content")).as("text"))
d.select(ExtractBoilerpipeTextDF(RemoveHTMLDF($"content")).as("content"))
}
}
46 changes: 43 additions & 3 deletions src/test/scala/io/archivesunleashed/app/CommandLineAppTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,75 @@ class CommandLineAppTest extends FunSuite with BeforeAndAfter {
private var sc: SparkContext = _
private val testSuccessCmds = Array(
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "DomainFrequencyExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "DomainFrequencyExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "DomainFrequencyExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "DomainFrequencyExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "GEXF"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "GRAPHML"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "gexf"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "graphml"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, domainGraphOpt, "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, plainTextOpt),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, plainTextOpt, "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, plainTextOpt, "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, plainTextOpt, "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, plainTextOpt, "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, "--partition", "1", extractOpt, plainTextOpt),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, imageGraphOpt),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, imageGraphOpt, "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, imageGraphOpt, "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, imageGraphOpt, "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, imageGraphOpt, "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, webPagesOpt),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, webPagesOpt, "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, webPagesOpt, "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, webPagesOpt, "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, webPagesOpt, "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "AudioInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "AudioInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "AudioInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "AudioInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "AudioInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "ImageInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "ImageInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "ImageInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "ImageInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "ImageInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PDFInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PDFInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PDFInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PDFInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PDFInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PresentationProgramInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PresentationProgramInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PresentationProgramInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PresentationProgramInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "PresentationProgramInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "SpreadsheetInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "SpreadsheetInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "SpreadsheetInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "SpreadsheetInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "SpreadsheetInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "TextFilesInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "TextFilesInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "TextFilesInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "TextFilesInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "TextFilesInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "VideoInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "VideoInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "VideoInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "VideoInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "VideoInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WordProcessorInformationExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WordProcessorInformationExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WordProcessorInformationExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WordProcessorInformationExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WordProcessorInformationExtractor", "--output-format", "parquet", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor", "--partition", "1")
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor", "--split"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor", "--partition", "1"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor", "--output-format", "parquet"),
Array(inputOpt, arcPath, warcPath, outputOpt, outputDir, extractOpt, "WebGraphExtractor", "--output-format", "parquet", "--partition", "1")
)

private val testFailCmds = Array(
Expand Down
Loading