Skip to content

Commit

Permalink
[ADAM-1481] Refactor ADAMContext loadXxx methods for consistency (#1487)
Browse files Browse the repository at this point in the history
Refactor ADAMContext loadXxx methods for consistency

* Format long lines
* Minor doc fixes
* Add FileExtensions, confirm pathName doc strings
* Use compression codecs from Hadoop configuration instead of hardcoded extensions.
  • Loading branch information
heuermh authored and fnothaft committed Apr 19, 2017
1 parent 04444aa commit 93d17b4
Show file tree
Hide file tree
Showing 39 changed files with 1,108 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ object JavaADAMContext {
implicit def toADAMContext(jac: JavaADAMContext): ADAMContext = jac.ac
}

/**
* The JavaADAMContext provides java-friendly functions on top of ADAMContext.
*
* @param ac The ADAMContext to wrap.
*/
class JavaADAMContext(val ac: ADAMContext) extends Serializable {

/**
Expand All @@ -42,74 +47,161 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable {
def getSparkContext: JavaSparkContext = new JavaSparkContext(ac.sc)

/**
* Loads in an ADAM read file. This method can load SAM, BAM, and ADAM files.
* Load alignment records into an AlignmentRecordRDD (java-friendly method).
*
* Loads path names ending in:
* * .bam/.cram/.sam as BAM/CRAM/SAM format,
* * .fa/.fasta as FASTA format,
* * .fq/.fastq as FASTQ format, and
* * .ifq as interleaved FASTQ format.
*
* If none of these match, fall back to Parquet + Avro.
*
* For FASTA, FASTQ, and interleaved FASTQ formats, compressed files are supported
* through compression codecs configured in Hadoop, which by default include .gz and .bz2,
* but can include more.
*
* @see ADAMContext#loadAlignments
*
* @param filePath Path to load the file from.
* @return Returns a read RDD.
* @param pathName The path name to load alignment records from.
* Globs/directories are supported, although file extension must be present
* for BAM/CRAM/SAM, FASTA, and FASTQ formats.
* @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records,
* sequence dictionary representing contigs the alignment records may be aligned to,
* and the record group dictionary for the alignment records if one is available.
*/
def loadAlignments(filePath: java.lang.String): AlignmentRecordRDD = {
ac.loadAlignments(filePath)
def loadAlignments(pathName: java.lang.String): AlignmentRecordRDD = {
ac.loadAlignments(pathName)
}

/**
* Loads in sequence fragments.
* Load nucleotide contig fragments into a NucleotideContigFragmentRDD (java-friendly method).
*
* Can load from FASTA or from Parquet encoded NucleotideContigFragments.
* If the path name has a .fa/.fasta extension, load as FASTA format.
* Else, fall back to Parquet + Avro.
*
* @param filePath Path to load the file from.
* @return Returns a NucleotideContigFragment RDD.
* For FASTA format, compressed files are supported through compression codecs configured
* in Hadoop, which by default include .gz and .bz2, but can include more.
*
* @see ADAMContext#loadContigFragments
*
* @param pathName The path name to load nucleotide contig fragments from.
* Globs/directories are supported, although file extension must be present
* for FASTA format.
* @return Returns a NucleotideContigFragmentRDD.
*/
def loadSequences(filePath: java.lang.String): NucleotideContigFragmentRDD = {
ac.loadSequences(filePath)
def loadContigFragments(pathName: java.lang.String): NucleotideContigFragmentRDD = {
ac.loadContigFragments(pathName)
}

/**
* Loads in read pairs as fragments.
* Load fragments into a FragmentRDD (java-friendly method).
*
* Loads path names ending in:
* * .bam/.cram/.sam as BAM/CRAM/SAM format and
* * .ifq as interleaved FASTQ format.
*
* If none of these match, fall back to Parquet + Avro.
*
* For interleaved FASTQ format, compressed files are supported through compression codecs
* configured in Hadoop, which by default include .gz and .bz2, but can include more.
*
* @see ADAMContext#loadFragments
*
* @param filePath The path to load the file from.
* @param pathName The path name to load fragments from.
* Globs/directories are supported, although file extension must be present
* for BAM/CRAM/SAM and FASTQ formats.
* @return Returns a FragmentRDD.
*/
def loadFragments(filePath: java.lang.String): FragmentRDD = {
ac.loadFragments(filePath)
def loadFragments(pathName: java.lang.String): FragmentRDD = {
ac.loadFragments(pathName)
}

/**
* Loads in features.
* Load features into a FeatureRDD (java-friendly method).
*
* @param filePath The path to load the file from.
* Loads path names ending in:
* * .bed as BED6/12 format,
* * .gff3 as GFF3 format,
* * .gtf/.gff as GTF/GFF2 format,
* * .narrow[pP]eak as NarrowPeak format, and
* * .interval_list as IntervalList format.
*
* If none of these match, fall back to Parquet + Avro.
*
* For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files
* are supported through compression codecs configured in Hadoop, which by default include
* .gz and .bz2, but can include more.
*
* @see ADAMContext#loadFeatures
*
* @param pathName The path name to load features from.
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @return Returns a FeatureRDD.
*/
def loadFeatures(filePath: java.lang.String): FeatureRDD = {
ac.loadFeatures(filePath)
def loadFeatures(pathName: java.lang.String): FeatureRDD = {
ac.loadFeatures(pathName)
}

/**
* Loads in a coverage file. This method can load BED, NarrowPeak, GFF3, GTF/GFF2, IntervalList and ADAM files.
* Load features into a FeatureRDD and convert to a CoverageRDD (java-friendly method).
* Coverage is stored in the score field of Feature.
*
* Loads path names ending in:
* * .bed as BED6/12 format,
* * .gff3 as GFF3 format,
* * .gtf/.gff as GTF/GFF2 format,
* * .narrow[pP]eak as NarrowPeak format, and
* * .interval_list as IntervalList format.
*
* If none of these match, fall back to Parquet + Avro.
*
* @param filePath Path to load the file from.
* @return Returns a Coverage RDD.
* For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files
* are supported through compression codecs configured in Hadoop, which by default include
* .gz and .bz2, but can include more.
*
* @see ADAMContext#loadCoverage
*
* @param pathName The path name to load features from.
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @return Returns a FeatureRDD converted to a CoverageRDD.
*/
def loadCoverage(filePath: java.lang.String): CoverageRDD = {
ac.loadCoverage(filePath)
def loadCoverage(pathName: java.lang.String): CoverageRDD = {
ac.loadCoverage(pathName)
}

/**
* Loads in genotypes.
* Load genotypes into a GenotypeRDD (java-friendly method).
*
* If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format.
* Else, fall back to Parquet + Avro.
*
* @param filePath The path to load the file from.
* @see ADAMContext#loadGenotypes
*
* @param pathName The path name to load genotypes from.
* Globs/directories are supported, although file extension must be present
* for VCF format.
* @return Returns a GenotypeRDD.
*/
def loadGenotypes(filePath: java.lang.String): GenotypeRDD = {
ac.loadGenotypes(filePath)
def loadGenotypes(pathName: java.lang.String): GenotypeRDD = {
ac.loadGenotypes(pathName)
}

/**
* Loads in variants.
* Load variants into a VariantRDD (java-friendly method).
*
* If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format.
* Else, fall back to Parquet + Avro.
*
* @see ADAMContext#loadVariants
*
* @param filePath The path to load the file from.
* @param pathName The path name to load variants from.
* Globs/directories are supported, although file extension must be present for VCF format.
* @return Returns a VariantRDD.
*/
def loadVariants(filePath: java.lang.String): VariantRDD = {
ac.loadVariants(filePath)
def loadVariants(pathName: java.lang.String): VariantRDD = {
ac.loadVariants(pathName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public static NucleotideContigFragmentRDD conduit(final NucleotideContigFragment

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
return jac.loadSequences(fileName);
return jac.loadContigFragments(fileName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class JavaADAMContextSuite extends ADAMFunSuite {

sparkTest("can read and write a small FASTA file") {
val path = copyResource("chr20.250k.fa.gz")
val aRdd = sc.loadSequences(path)
val aRdd = sc.loadContigFragments(path)
assert(aRdd.jrdd.count() === 26)

val newRdd = JavaADAMContigConduit.conduit(aRdd, sc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
override def run(sc: SparkContext): Unit = {

log.info("Loading ADAM nucleotide contig fragments from disk.")
val contigFragments = sc.loadSequences(args.inputPath)
val contigFragments = sc.loadContigFragments(args.inputPath)

log.info("Merging fragments and writing FASTA to disk.")
val contigs = contigFragments.mergeFragments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ADAM2Fastq(val args: ADAM2FastqArgs) extends BDGSparkCommand[ADAM2FastqArg
else
None

var reads = sc.loadAlignments(args.inputPath, projection = projectionOpt)
var reads = sc.loadAlignments(args.inputPath, optProjection = projectionOpt)

if (args.repartition != -1) {
log.info("Repartitioning reads to to '%d' partitions".format(args.repartition))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CountContigKmers(protected val args: CountContigKmersArgs) extends BDGSpar
def run(sc: SparkContext) {

// read from disk
val fragments = sc.loadSequences(args.inputPath)
val fragments = sc.loadContigFragments(args.inputPath)

// count kmers
val countedKmers = fragments.countKmers(args.kmerLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CountReadKmers(protected val args: CountReadKmersArgs) extends BDGSparkCom
// read from disk
var adamRecords = sc.loadAlignments(
args.inputPath,
projection = Some(Projection(AlignmentRecordField.sequence))
optProjection = Some(Projection(AlignmentRecordField.sequence))
)

if (args.repartition != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Fasta2ADAMArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-reads", usage = "Maps contig IDs to match contig IDs of reads.")
var reads: String = ""
@Args4jOption(required = false, name = "-fragment_length", usage = "Sets maximum fragment length. Default value is 10,000. Values greater than 1e9 should be avoided.")
var fragmentLength: Long = 10000L
var maximumFragmentLength: Long = 10000L
@Args4jOption(required = false, name = "-repartition", usage = "Sets the number of output partitions to write, if desired.")
var partitions: Int = -1
}
Expand All @@ -52,7 +52,7 @@ class Fasta2ADAM(protected val args: Fasta2ADAMArgs) extends BDGSparkCommand[Fas

def run(sc: SparkContext) {
log.info("Loading FASTA data from disk.")
val adamFasta = sc.loadFasta(args.fastaFile, fragmentLength = args.fragmentLength)
val adamFasta = sc.loadFasta(args.fastaFile, maximumFragmentLength = args.maximumFragmentLength)

if (args.verbose) {
log.info("FASTA contains: %s", adamFasta.sequences.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class FlagStat(protected val args: FlagStatArgs) extends BDGSparkCommand[FlagSta

val adamFile = sc.loadAlignments(
args.inputPath,
projection = Some(projection),
optProjection = Some(projection),
stringency = stringency
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
"Cannot compute coverage for both negative and positive strands separately")

// load reads
val readsRdd: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, projection = Some(proj))
val readsRdd: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, optProjection = Some(proj))

val finalReads = if (args.onlyNegativeStrands && !args.onlyPositiveStrands) {
readsRdd.transform(rdd => rdd.filter(_.getReadNegativeStrand))
Expand Down
17 changes: 9 additions & 8 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
// optionally load a reference
val optReferenceFile = Option(args.reference).map(f => {
sc.loadReferenceFile(f,
fragmentLength = args.mdTagsFragmentSize)
maximumFragmentLength = args.mdTagsFragmentSize)
})

// run realignment
Expand Down Expand Up @@ -332,7 +332,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
if (args.mdTagsReferenceFile != null) {
log.info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}")
val referenceFile = sc.loadReferenceFile(args.mdTagsReferenceFile,
fragmentLength = args.mdTagsFragmentSize)
maximumFragmentLength = args.mdTagsFragmentSize)
rdd.computeMismatchingPositions(
referenceFile,
overwriteExistingTags = args.mdTagsOverwrite,
Expand Down Expand Up @@ -432,14 +432,15 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
}

sc.loadParquetAlignments(args.inputPath,
predicate = pred,
projection = proj)
optPredicate = pred,
optProjection = proj)
} else {
sc.loadAlignments(
args.inputPath,
filePath2Opt = Option(args.pairedFastqFile),
recordGroupOpt = Option(args.fastqRecordGroup),
stringency = stringency)
optPathName2 = Option(args.pairedFastqFile),
optRecordGroup = Option(args.fastqRecordGroup),
stringency = stringency
)
}
val rdd = aRdd.rdd
val sd = aRdd.sequences
Expand All @@ -459,7 +460,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
} else {
sc.loadAlignments(
concatFilename,
recordGroupOpt = Option(args.fastqRecordGroup)
optRecordGroup = Option(args.fastqRecordGroup)
)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class TransformFeatures(val args: TransformFeaturesArgs)
sc.loadFeatures(
args.featuresFile,
optStorageLevel = optStorageLevel,
projection = None,
minPartitions = Option(args.numPartitions)
optMinPartitions = Option(args.numPartitions),
optProjection = None
).save(args.outputPath, args.single)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,21 @@ private[adam] object FastaConverter {
* contig fragments.
*
* @note Input dataset is assumed to have come in from a Hadoop TextInputFormat reader. This sets
* a specific format for the RDD's Key-Value pairs.
* a specific format for the RDD's Key-Value pairs.
*
* @throws AssertionError Thrown if there appear to be multiple sequences in a single file
* that do not have descriptions.
* that do not have descriptions.
* @throws IllegalArgumentException Thrown if a sequence does not have sequence data.
*
* @param rdd RDD containing Long,String tuples, where the Long corresponds to the number
* of the file line, and the String is the line of the file.
* @param maxFragmentLength The maximum length of fragments in the contig.
* of the file line, and the String is the line of the file.
* @param maximumFragmentLength Maximum fragment length. Defaults to 10000L. Values greater
* than 1e9 should be avoided.
* @return An RDD of ADAM FASTA data.
*/
def apply(
rdd: RDD[(Long, String)],
maxFragmentLength: Long = 10000L): RDD[NucleotideContigFragment] = {
maximumFragmentLength: Long = 10000L): RDD[NucleotideContigFragment] = {
val filtered = rdd.map(kv => (kv._1, kv._2.trim()))
.filter((kv: (Long, String)) => !kv._2.startsWith(";"))

Expand All @@ -125,7 +126,7 @@ private[adam] object FastaConverter {

val groupedContigs = keyedSequences.groupByKey()

val converter = new FastaConverter(maxFragmentLength)
val converter = new FastaConverter(maximumFragmentLength)

groupedContigs.flatMap {
case (id, lines) =>
Expand Down
Loading

0 comments on commit 93d17b4

Please sign in to comment.