Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolves various single file save/header issues #1104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static FeatureRDD conduit(FeatureRDD recordRdd,
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.feature.adam";
recordRdd.save(fileName);
recordRdd.save(fileName, false);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class ADAM2VcfArgs extends Args4jBase with ParquetArgs {

@Args4jOption(required = false, name = "-sort_on_save", usage = "Sort the VCF output.")
var sort: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Save as a single VCF file.")
var single: Boolean = false
}

class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] with DictionaryCommand with Logging {
Expand All @@ -80,6 +83,8 @@ class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] wit
variantContexts
}

variantContextsToSave.saveAsVcf(args.outputPath, sortOnSave = args.sort)
variantContextsToSave.saveAsVcf(args.outputPath,
sortOnSave = args.sort,
asSingleFile = args.single)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object ADAMMain {
VcfAnnotation2ADAM,
Fasta2ADAM,
ADAM2Fasta,
Features2ADAM,
TransformFeatures,
WigFix2Bed,
Fragments2Reads,
Reads2Fragments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
var onlyNegativeStrands: Boolean = false
@Args4jOption(required = false, name = "-only_positive_strands", usage = "Compute coverage for positive strands")
var onlyPositiveStrands: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
Expand All @@ -80,7 +82,7 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath)
.save(args.outputPath, asSingleFile = args.asSingleFile)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,39 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption }

object Features2ADAM extends BDGCommandCompanion {
val commandName = "features2adam"
val commandDescription = "Convert a file with sequence features into corresponding ADAM format"
object TransformFeatures extends BDGCommandCompanion {
val commandName = "transformFeatures"
val commandDescription = "Convert a file with sequence features into corresponding ADAM format and vice versa"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have this correctly, on either INPUT or OUTPUT if the file extension is not recognized as a plain-text feature file, then it is assumed to be parquet. That way this one command can convert both ways. The usage strings below should be updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I'll update the usage strings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


def apply(cmdLine: Array[String]) = {
new Features2ADAM(Args4j[Features2ADAMArgs](cmdLine))
new TransformFeatures(Args4j[TransformFeaturesArgs](cmdLine))
}
}

class Features2ADAMArgs extends Args4jBase with ParquetSaveArgs {
@Argument(required = true, metaVar = "FEATURES",
usage = "The features file to convert (e.g., .bed, .gff)", index = 0)
class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
@Argument(required = true, metaVar = "INPUT",
usage = "The features file to convert (e.g., .bed, .gff). If extension is not detected, Parquet is assumed.", index = 0)
var featuresFile: String = _
@Argument(required = true, metaVar = "ADAM",
usage = "Location to write ADAM features data", index = 1)

@Argument(required = true, metaVar = "OUTPUT",
usage = "Location to write ADAM features data. If extension is not detected, Parquet is assumed.", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-num_partitions",
usage = "Number of partitions to load an interval file using.")
var numPartitions: Int = _

@Args4jOption(required = false, name = "-single",
usage = "Save as a single file, for the text formats.")
var single: Boolean = false
}

class Features2ADAM(val args: Features2ADAMArgs)
extends BDGSparkCommand[Features2ADAMArgs] {
val companion = Features2ADAM
class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {
val companion = TransformFeatures

def run(sc: SparkContext) {
sc.loadFeatures(args.featuresFile, None, Option(args.numPartitions)).saveAsParquet(args)
sc.loadFeatures(args.featuresFile, None, Option(args.numPartitions))
.save(args.outputPath, args.single)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.utils.cli.Args4j
import org.bdgenomics.formats.avro.Feature

class Features2ADAMSuite extends ADAMFunSuite {
class TransformFeaturesSuite extends ADAMFunSuite {

sparkTest("can convert a simple BED file") {

val loader = Thread.currentThread().getContextClassLoader
val inputPath = loader.getResource("gencode.v7.annotation.trunc10.bed").getPath
val outputFile = File.createTempFile("adam-cli.Features2ADAMSuite", ".adam")
val outputFile = File.createTempFile("adam-cli.TransformFeaturesSuite", ".adam")
val outputPath = outputFile.getAbsolutePath

val argLine = "%s %s".format(inputPath, outputPath).split("\\s+")
Expand All @@ -40,9 +40,9 @@ class Features2ADAMSuite extends ADAMFunSuite {
// but the "createTempFile" method actually creates the file (on some systems?)
assert(outputFile.delete(), "Couldn't delete (empty) temp file")

val args: Features2ADAMArgs = Args4j.apply[Features2ADAMArgs](argLine)
val args: TransformFeaturesArgs = Args4j.apply[TransformFeaturesArgs](argLine)

val features2Adam = new Features2ADAM(args)
val features2Adam = new TransformFeatures(args)
features2Adam.run(sc)

val schema = Projection(featureId, contigName, start, strand)
Expand All @@ -56,9 +56,9 @@ class Features2ADAMSuite extends ADAMFunSuite {
sparkTest("can convert a simple wigfix file") {
val loader = Thread.currentThread().getContextClassLoader
val inputPath = loader.getResource("chr5.phyloP46way.trunc.wigFix").getPath
val bedFile = File.createTempFile("adam-cli.Features2ADAMSuite", ".bed")
val bedFile = File.createTempFile("adam-cli.TransformFeaturesSuite", ".bed")
val bedPath = bedFile.getAbsolutePath
val outputFile = File.createTempFile("adam-cli.Features2ADAMSuite", ".adam")
val outputFile = File.createTempFile("adam-cli.TransformFeaturesSuite", ".adam")
val outputPath = outputFile.getAbsolutePath

// We have to do this, since the features2adam won't work if the file already exists,
Expand All @@ -74,8 +74,8 @@ class Features2ADAMSuite extends ADAMFunSuite {

// convert to ADAM Features
val adamArgLine = "%s %s".format(bedPath, outputPath).split("\\s+")
val adamArgs: Features2ADAMArgs = Args4j.apply[Features2ADAMArgs](adamArgLine)
val features2Adam = new Features2ADAM(adamArgs)
val adamArgs: TransformFeaturesArgs = Args4j.apply[TransformFeaturesArgs](adamArgLine)
val features2Adam = new TransformFeatures(adamArgs)
features2Adam.run(sc)

val schema = Projection(featureId, contigName, start, end, score)
Expand Down
33 changes: 19 additions & 14 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1018,11 +1018,16 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
* @param filePath The path to the file to load.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
* when a malformed line is encountered, SILENT will ignore the malformed
* line, STRICT will throw an exception.
* @return Returns a FeatureRDD.
*/
def loadNarrowPeak(filePath: String, minPartitions: Option[Int] = None): FeatureRDD = {
def loadNarrowPeak(filePath: String,
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.flatMap(new NarrowPeakParser().parse)
.flatMap(new NarrowPeakParser().parse(_, stringency))
if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records)
}
Expand All @@ -1033,22 +1038,22 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
* @param filePath The path to the file to load.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
* when a malformed line is encountered, SILENT will ignore the malformed
* line, STRICT will throw an exception.
* @return Returns a FeatureRDD.
*/
def loadIntervalList(filePath: String, minPartitions: Option[Int] = None): FeatureRDD = {
def loadIntervalList(filePath: String,
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val parsedLines = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.map(new IntervalListParser().parse)
val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*), parsedLines.flatMap(_._2))
.map(new IntervalListParser().parse(_, stringency))
val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*),
parsedLines.flatMap(_._2))
val seqDictMap = seqDict.records.map(sr => sr.name -> sr).toMap
val recordsWithContigs = for {
record <- records
seqRecord <- seqDictMap.get(record.getContigName)
} yield Feature.newBuilder(record)
.setContigName(seqRecord.name)
.build()

if (Metrics.isRecording) recordsWithContigs.instrument() else recordsWithContigs
FeatureRDD(recordsWithContigs, seqDict)

if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, seqDict)
}

/**
Expand Down
141 changes: 141 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import htsjdk.samtools.util.BlockCompressedStreamConstants
import java.io.{ InputStream, OutputStream }
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.bdgenomics.utils.misc.Logging
import scala.annotation.tailrec

/**
* Helper object to merge sharded files together.
*/
private[rdd] object FileMerger extends Logging {

/**
* Merges together sharded files, while preserving partition ordering.
*
* @param fs The file system implementation to use.
* @param outputPath The location to write the merged file at.
* @param tailPath The location where the sharded files have been written.
* @param optHeaderPath Optionally, the location where a header file has
* been written.
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
* @param bufferSize The size in bytes of the buffer used for copying.
*/
def mergeFiles(fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
bufferSize: Int = 1024) {

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
.toArray

// doing this correctly is surprisingly hard
// specifically, copy merge does not care about ordering, which is
// fine if your files are unordered, but if the blocks in the file
// _are_ ordered, then hahahahahahahahahaha. GOOD. TIMES.
//
// fortunately, the blocks in our file are ordered
// the performance of this section is hilarious
//
// specifically, the performance is hilariously bad
//
// but! it is correct.

// open our output file
val os = fs.create(outputPath)

// here is a byte array for copying
val ba = new Array[Byte](bufferSize)

@tailrec def copy(is: InputStream,
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
los.write(ba, 0, bytesRead)

copy(is, los)
}
}

// optionally copy the header
optHeaderPath.foreach(p => {
log.info("Copying header file (%s)".format(p))

// open our input file
val is = fs.open(p)

// until we are out of bytes, copy
copy(is, os)

// close our input stream
is.close()
})

// loop over allllll the files and copy them
val numFiles = tailFiles.length
var filesCopied = 1
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
p.toString,
filesCopied,
numFiles))

// open our input file
val is = fs.open(p)

// until we are out of bytes, copy
copy(is, os)

// close our input stream
is.close()

// increment file copy count
filesCopied += 1
})

// finish the file off
if (writeEmptyGzipBlock) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
}

// flush and close the output stream
os.flush()
os.close()

// delete temp files
optHeaderPath.foreach(headPath => fs.delete(headPath, true))
fs.delete(tailPath, true)
}
}
Loading