diff --git a/adam-apis/src/main/scala/org/bdgenomics/adam/apis/java/JavaADAMContext.scala b/adam-apis/src/main/scala/org/bdgenomics/adam/apis/java/JavaADAMContext.scala index 5426bcdb3c..58aa0b8544 100644 --- a/adam-apis/src/main/scala/org/bdgenomics/adam/apis/java/JavaADAMContext.scala +++ b/adam-apis/src/main/scala/org/bdgenomics/adam/apis/java/JavaADAMContext.scala @@ -34,6 +34,11 @@ object JavaADAMContext { implicit def toADAMContext(jac: JavaADAMContext): ADAMContext = jac.ac } +/** + * The JavaADAMContext provides java-friendly functions on top of ADAMContext. + * + * @param ac The ADAMContext to wrap. + */ class JavaADAMContext(val ac: ADAMContext) extends Serializable { /** @@ -42,74 +47,161 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { def getSparkContext: JavaSparkContext = new JavaSparkContext(ac.sc) /** - * Loads in an ADAM read file. This method can load SAM, BAM, and ADAM files. + * Load alignment records into an AlignmentRecordRDD (java-friendly method). + * + * Loads path names ending in: + * * .bam/.cram/.sam as BAM/CRAM/SAM format, + * * .fa/.fasta as FASTA format, + * * .fq/.fastq as FASTQ format, and + * * .ifq as interleaved FASTQ format. + * + * If none of these match, fall back to Parquet + Avro. + * + * For FASTA, FASTQ, and interleaved FASTQ formats, compressed files are supported + * through compression codecs configured in Hadoop, which by default include .gz and .bz2, + * but can include more. + * + * @see ADAMContext#loadAlignments * - * @param filePath Path to load the file from. - * @return Returns a read RDD. + * @param pathName The path name to load alignment records from. + * Globs/directories are supported, although file extension must be present + * for BAM/CRAM/SAM, FASTA, and FASTQ formats. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ - def loadAlignments(filePath: java.lang.String): AlignmentRecordRDD = { - ac.loadAlignments(filePath) + def loadAlignments(pathName: java.lang.String): AlignmentRecordRDD = { + ac.loadAlignments(pathName) } /** - * Loads in sequence fragments. + * Load nucleotide contig fragments into a NucleotideContigFragmentRDD (java-friendly method). * - * Can load from FASTA or from Parquet encoded NucleotideContigFragments. + * If the path name has a .fa/.fasta extension, load as FASTA format. + * Else, fall back to Parquet + Avro. * - * @param filePath Path to load the file from. - * @return Returns a NucleotideContigFragment RDD. + * For FASTA format, compressed files are supported through compression codecs configured + * in Hadoop, which by default include .gz and .bz2, but can include more. + * + * @see ADAMContext#loadContigFragments + * + * @param pathName The path name to load nucleotide contig fragments from. + * Globs/directories are supported, although file extension must be present + * for FASTA format. + * @return Returns a NucleotideContigFragmentRDD. */ - def loadSequences(filePath: java.lang.String): NucleotideContigFragmentRDD = { - ac.loadSequences(filePath) + def loadContigFragments(pathName: java.lang.String): NucleotideContigFragmentRDD = { + ac.loadContigFragments(pathName) } /** - * Loads in read pairs as fragments. + * Load fragments into a FragmentRDD (java-friendly method). + * + * Loads path names ending in: + * * .bam/.cram/.sam as BAM/CRAM/SAM format and + * * .ifq as interleaved FASTQ format. + * + * If none of these match, fall back to Parquet + Avro. + * + * For interleaved FASTQ format, compressed files are supported through compression codecs + * configured in Hadoop, which by default include .gz and .bz2, but can include more. + * + * @see ADAMContext#loadFragments * - * @param filePath The path to load the file from. + * @param pathName The path name to load fragments from. + * Globs/directories are supported, although file extension must be present + * for BAM/CRAM/SAM and FASTQ formats. * @return Returns a FragmentRDD. */ - def loadFragments(filePath: java.lang.String): FragmentRDD = { - ac.loadFragments(filePath) + def loadFragments(pathName: java.lang.String): FragmentRDD = { + ac.loadFragments(pathName) } /** - * Loads in features. + * Load features into a FeatureRDD (java-friendly method). * - * @param filePath The path to load the file from. + * Loads path names ending in: + * * .bed as BED6/12 format, + * * .gff3 as GFF3 format, + * * .gtf/.gff as GTF/GFF2 format, + * * .narrow[pP]eak as NarrowPeak format, and + * * .interval_list as IntervalList format. + * + * If none of these match, fall back to Parquet + Avro. + * + * For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files + * are supported through compression codecs configured in Hadoop, which by default include + * .gz and .bz2, but can include more. + * + * @see ADAMContext#loadFeatures + * + * @param pathName The path name to load features from. + * Globs/directories are supported, although file extension must be present + * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. * @return Returns a FeatureRDD. */ - def loadFeatures(filePath: java.lang.String): FeatureRDD = { - ac.loadFeatures(filePath) + def loadFeatures(pathName: java.lang.String): FeatureRDD = { + ac.loadFeatures(pathName) } /** - * Loads in a coverage file. This method can load BED, NarrowPeak, GFF3, GTF/GFF2, IntervalList and ADAM files. + * Load features into a FeatureRDD and convert to a CoverageRDD (java-friendly method). + * Coverage is stored in the score field of Feature. + * + * Loads path names ending in: + * * .bed as BED6/12 format, + * * .gff3 as GFF3 format, + * * .gtf/.gff as GTF/GFF2 format, + * * .narrow[pP]eak as NarrowPeak format, and + * * .interval_list as IntervalList format. + * + * If none of these match, fall back to Parquet + Avro. * - * @param filePath Path to load the file from. - * @return Returns a Coverage RDD. + * For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files + * are supported through compression codecs configured in Hadoop, which by default include + * .gz and .bz2, but can include more. + * + * @see ADAMContext#loadCoverage + * + * @param pathName The path name to load features from. + * Globs/directories are supported, although file extension must be present + * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. + * @return Returns a FeatureRDD converted to a CoverageRDD. */ - def loadCoverage(filePath: java.lang.String): CoverageRDD = { - ac.loadCoverage(filePath) + def loadCoverage(pathName: java.lang.String): CoverageRDD = { + ac.loadCoverage(pathName) } /** - * Loads in genotypes. + * Load genotypes into a GenotypeRDD (java-friendly method). + * + * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. + * Else, fall back to Parquet + Avro. * - * @param filePath The path to load the file from. + * @see ADAMContext#loadGenotypes + * + * @param pathName The path name to load genotypes from. + * Globs/directories are supported, although file extension must be present + * for VCF format. * @return Returns a GenotypeRDD. */ - def loadGenotypes(filePath: java.lang.String): GenotypeRDD = { - ac.loadGenotypes(filePath) + def loadGenotypes(pathName: java.lang.String): GenotypeRDD = { + ac.loadGenotypes(pathName) } /** - * Loads in variants. + * Load variants into a VariantRDD (java-friendly method). + * + * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. + * Else, fall back to Parquet + Avro. + * + * @see ADAMContext#loadVariants * - * @param filePath The path to load the file from. + * @param pathName The path name to load variants from. + * Globs/directories are supported, although file extension must be present for VCF format. * @return Returns a VariantRDD. */ - def loadVariants(filePath: java.lang.String): VariantRDD = { - ac.loadVariants(filePath) + def loadVariants(pathName: java.lang.String): VariantRDD = { + ac.loadVariants(pathName) } } diff --git a/adam-apis/src/test/java/org/bdgenomics/adam/apis/java/JavaADAMContigConduit.java b/adam-apis/src/test/java/org/bdgenomics/adam/apis/java/JavaADAMContigConduit.java index 4f133df284..30d9bbea57 100644 --- a/adam-apis/src/test/java/org/bdgenomics/adam/apis/java/JavaADAMContigConduit.java +++ b/adam-apis/src/test/java/org/bdgenomics/adam/apis/java/JavaADAMContigConduit.java @@ -38,6 +38,6 @@ public static NucleotideContigFragmentRDD conduit(final NucleotideContigFragment // create a new adam context and load the file JavaADAMContext jac = new JavaADAMContext(ac); - return jac.loadSequences(fileName); + return jac.loadContigFragments(fileName); } } diff --git a/adam-apis/src/test/scala/org/bdgenomics/adam/apis/java/JavaADAMContextSuite.scala b/adam-apis/src/test/scala/org/bdgenomics/adam/apis/java/JavaADAMContextSuite.scala index 2a499c907e..471a0b092c 100644 --- a/adam-apis/src/test/scala/org/bdgenomics/adam/apis/java/JavaADAMContextSuite.scala +++ b/adam-apis/src/test/scala/org/bdgenomics/adam/apis/java/JavaADAMContextSuite.scala @@ -39,7 +39,7 @@ class JavaADAMContextSuite extends ADAMFunSuite { sparkTest("can read and write a small FASTA file") { val path = copyResource("chr20.250k.fa.gz") - val aRdd = sc.loadSequences(path) + val aRdd = sc.loadContigFragments(path) assert(aRdd.jrdd.count() === 26) val newRdd = JavaADAMContigConduit.conduit(aRdd, sc) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala index 8b44ff3102..175c7ffa2f 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala @@ -50,7 +50,7 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg override def run(sc: SparkContext): Unit = { log.info("Loading ADAM nucleotide contig fragments from disk.") - val contigFragments = sc.loadSequences(args.inputPath) + val contigFragments = sc.loadContigFragments(args.inputPath) log.info("Merging fragments and writing FASTA to disk.") val contigs = contigFragments.mergeFragments() diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fastq.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fastq.scala index e9dcd6439b..54c571c58a 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fastq.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fastq.scala @@ -76,7 +76,7 @@ class ADAM2Fastq(val args: ADAM2FastqArgs) extends BDGSparkCommand[ADAM2FastqArg else None - var reads = sc.loadAlignments(args.inputPath, projection = projectionOpt) + var reads = sc.loadAlignments(args.inputPath, optProjection = projectionOpt) if (args.repartition != -1) { log.info("Repartitioning reads to to '%d' partitions".format(args.repartition)) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountContigKmers.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountContigKmers.scala index 50351054c4..9a758fa35f 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountContigKmers.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountContigKmers.scala @@ -49,7 +49,7 @@ class CountContigKmers(protected val args: CountContigKmersArgs) extends BDGSpar def run(sc: SparkContext) { // read from disk - val fragments = sc.loadSequences(args.inputPath) + val fragments = sc.loadContigFragments(args.inputPath) // count kmers val countedKmers = fragments.countKmers(args.kmerLength) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountReadKmers.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountReadKmers.scala index 10d55a4947..a378f8c797 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountReadKmers.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CountReadKmers.scala @@ -54,7 +54,7 @@ class CountReadKmers(protected val args: CountReadKmersArgs) extends BDGSparkCom // read from disk var adamRecords = sc.loadAlignments( args.inputPath, - projection = Some(Projection(AlignmentRecordField.sequence)) + optProjection = Some(Projection(AlignmentRecordField.sequence)) ) if (args.repartition != -1) { diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala index 119bbfc93c..76761280da 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala @@ -42,7 +42,7 @@ class Fasta2ADAMArgs extends Args4jBase with ParquetSaveArgs { @Args4jOption(required = false, name = "-reads", usage = "Maps contig IDs to match contig IDs of reads.") var reads: String = "" @Args4jOption(required = false, name = "-fragment_length", usage = "Sets maximum fragment length. Default value is 10,000. Values greater than 1e9 should be avoided.") - var fragmentLength: Long = 10000L + var maximumFragmentLength: Long = 10000L @Args4jOption(required = false, name = "-repartition", usage = "Sets the number of output partitions to write, if desired.") var partitions: Int = -1 } @@ -52,7 +52,7 @@ class Fasta2ADAM(protected val args: Fasta2ADAMArgs) extends BDGSparkCommand[Fas def run(sc: SparkContext) { log.info("Loading FASTA data from disk.") - val adamFasta = sc.loadFasta(args.fastaFile, fragmentLength = args.fragmentLength) + val adamFasta = sc.loadFasta(args.fastaFile, maximumFragmentLength = args.maximumFragmentLength) if (args.verbose) { log.info("FASTA contains: %s", adamFasta.sequences.toString) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FlagStat.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FlagStat.scala index dc5df57968..6125e6ea55 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FlagStat.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FlagStat.scala @@ -69,7 +69,7 @@ class FlagStat(protected val args: FlagStatArgs) extends BDGSparkCommand[FlagSta val adamFile = sc.loadAlignments( args.inputPath, - projection = Some(projection), + optProjection = Some(projection), stringency = stringency ) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala index 2f30bf2f6b..07630b5762 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala @@ -70,7 +70,7 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom "Cannot compute coverage for both negative and positive strands separately") // load reads - val readsRdd: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, projection = Some(proj)) + val readsRdd: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, optProjection = Some(proj)) val finalReads = if (args.onlyNegativeStrands && !args.onlyPositiveStrands) { readsRdd.transform(rdd => rdd.filter(_.getReadNegativeStrand)) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala index 6e2f21fff1..20d8df9eb8 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala @@ -188,7 +188,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans // optionally load a reference val optReferenceFile = Option(args.reference).map(f => { sc.loadReferenceFile(f, - fragmentLength = args.mdTagsFragmentSize) + maximumFragmentLength = args.mdTagsFragmentSize) }) // run realignment @@ -332,7 +332,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans if (args.mdTagsReferenceFile != null) { log.info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}") val referenceFile = sc.loadReferenceFile(args.mdTagsReferenceFile, - fragmentLength = args.mdTagsFragmentSize) + maximumFragmentLength = args.mdTagsFragmentSize) rdd.computeMismatchingPositions( referenceFile, overwriteExistingTags = args.mdTagsOverwrite, @@ -432,14 +432,15 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans } sc.loadParquetAlignments(args.inputPath, - predicate = pred, - projection = proj) + optPredicate = pred, + optProjection = proj) } else { sc.loadAlignments( args.inputPath, - filePath2Opt = Option(args.pairedFastqFile), - recordGroupOpt = Option(args.fastqRecordGroup), - stringency = stringency) + optPathName2 = Option(args.pairedFastqFile), + optRecordGroup = Option(args.fastqRecordGroup), + stringency = stringency + ) } val rdd = aRdd.rdd val sd = aRdd.sequences @@ -459,7 +460,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans } else { sc.loadAlignments( concatFilename, - recordGroupOpt = Option(args.fastqRecordGroup) + optRecordGroup = Option(args.fastqRecordGroup) ) }) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala index c28530cbe4..84a9a1465e 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala @@ -67,8 +67,8 @@ class TransformFeatures(val args: TransformFeaturesArgs) sc.loadFeatures( args.featuresFile, optStorageLevel = optStorageLevel, - projection = None, - minPartitions = Option(args.numPartitions) + optMinPartitions = Option(args.numPartitions), + optProjection = None ).save(args.outputPath, args.single) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastaConverter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastaConverter.scala index aec39fc3e3..49aa1682c6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastaConverter.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastaConverter.scala @@ -94,20 +94,21 @@ private[adam] object FastaConverter { * contig fragments. * * @note Input dataset is assumed to have come in from a Hadoop TextInputFormat reader. This sets - * a specific format for the RDD's Key-Value pairs. + * a specific format for the RDD's Key-Value pairs. * * @throws AssertionError Thrown if there appear to be multiple sequences in a single file - * that do not have descriptions. + * that do not have descriptions. * @throws IllegalArgumentException Thrown if a sequence does not have sequence data. * * @param rdd RDD containing Long,String tuples, where the Long corresponds to the number - * of the file line, and the String is the line of the file. - * @param maxFragmentLength The maximum length of fragments in the contig. + * of the file line, and the String is the line of the file. + * @param maximumFragmentLength Maximum fragment length. Defaults to 10000L. Values greater + * than 1e9 should be avoided. * @return An RDD of ADAM FASTA data. */ def apply( rdd: RDD[(Long, String)], - maxFragmentLength: Long = 10000L): RDD[NucleotideContigFragment] = { + maximumFragmentLength: Long = 10000L): RDD[NucleotideContigFragment] = { val filtered = rdd.map(kv => (kv._1, kv._2.trim())) .filter((kv: (Long, String)) => !kv._2.startsWith(";")) @@ -125,7 +126,7 @@ private[adam] object FastaConverter { val groupedContigs = keyedSequences.groupByKey() - val converter = new FastaConverter(maxFragmentLength) + val converter = new FastaConverter(maximumFragmentLength) groupedContigs.flatMap { case (id, lines) => diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index 226820a79b..b0030e19d3 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -24,12 +24,33 @@ import org.bdgenomics.utils.instrumentation.Metrics */ object Timers extends Metrics { - // File Loading - val LoadAlignmentRecords = timer("Load Alignment Records") - val BAMLoad = timer("BAM File Load") - val ParquetLoad = timer("Parquet File Load") + // Load methods + val LoadAlignments = timer("Load Alignments") + val LoadContigFragments = timer("Load Contig Fragments") + val LoadCoverage = timer("Load Coverage") val LoadFeatures = timer("Load Features") val LoadFragments = timer("Load Fragments") + val LoadGenotypes = timer("Load Genotypes") + val LoadReferenceFile = timer("Load ReferenceFile") + val LoadVariants = timer("Load Variants") + + // Format specific load methods + val LoadBam = timer("Load BAM/CRAM/SAM format") + val LoadBed = timer("Load BED6/12 format") + val LoadFasta = timer("Load FASTA format") + val LoadFastq = timer("Load FASTQ format") + val LoadGff3 = timer("Load GFF3 format") + val LoadGtf = timer("Load GTF/GFF2 format") + val LoadIndexedBam = timer("Load indexed BAM format") + val LoadIndexedVcf = timer("Load indexed VCF format") + val LoadInterleavedFastq = timer("Load interleaved FASTQ format") + val LoadInterleavedFastqFragments = timer("Load interleaved FASTQ format as Fragments") + val LoadIntervalList = timer("Load IntervalList format") + val LoadNarrowPeak = timer("Load NarrowPeak format") + val LoadPairedFastq = timer("Load paired FASTQ format") + val LoadParquet = timer("Load Parquet + Avro format") + val LoadUnpairedFastq = timer("Load unpaired FASTQ format") + val LoadVcf = timer("Load VCF format") // Trim Reads val TrimReadsInDriver = timer("Trim Reads") diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index feab53e70f..c87533c188 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -33,6 +33,7 @@ import org.apache.avro.generic.IndexedRecord import org.apache.avro.specific.{ SpecificDatumReader, SpecificRecord, SpecificRecordBase } import org.apache.hadoop.fs.{ FileSystem, Path, PathFilter } import org.apache.hadoop.io.{ LongWritable, Text } +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.parquet.avro.{ AvroParquetInputFormat, AvroReadSupport } import org.apache.parquet.filter2.predicate.FilterPredicate @@ -56,6 +57,7 @@ import org.bdgenomics.adam.rdd.fragment.FragmentRDD import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, RepairPartitions } import org.bdgenomics.adam.rdd.variant._ import org.bdgenomics.adam.rich.RichAlignmentRecord +import org.bdgenomics.adam.util.FileExtensions._ import org.bdgenomics.adam.util.{ ReferenceContigMap, ReferenceFile, TwoBitFile } import org.bdgenomics.formats.avro._ import org.bdgenomics.utils.instrumentation.Metrics @@ -133,7 +135,7 @@ private class FileFilter(private val name: String) extends PathFilter { /** * @param path Path to evaluate. - * @return Returns true if the filename of the path matches the name passed + * @return Returns true if the pathName of the path matches the name passed * to the constructor. */ def accept(path: Path): Boolean = { @@ -165,13 +167,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * @param filePath The (possibly globbed) filepath to load a VCF from. + * @param pathName The path name to load VCF format metadata from. + * Globs/directories are supported. * @return Returns a tuple of metadata from the VCF header, including the * sequence dictionary and a list of the samples contained in the VCF. */ - private[rdd] def loadVcfMetadata(filePath: String): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = { + private[rdd] def loadVcfMetadata(pathName: String): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = { // get the paths to all vcfs - val files = getFsAndFiles(new Path(filePath)) + val files = getFsAndFiles(new Path(pathName)) // load yonder the metadata files.map(p => loadSingleVcfMetadata(p.toString)).reduce((p1, p2) => { @@ -180,13 +183,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * @param filePath The (possibly globbed) filepath to load a VCF from. + * @param pathName The path name to load VCF format metadata from. + * Globs/directories are not supported. * @return Returns a tuple of metadata from the VCF header, including the * sequence dictionary and a list of the samples contained in the VCF. * * @see loadVcfMetadata */ - private def loadSingleVcfMetadata(filePath: String): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = { + private def loadSingleVcfMetadata(pathName: String): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = { def headerToMetadata(vcfHeader: VCFHeader): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = { val sd = SequenceDictionary.fromVCFHeader(vcfHeader) val samples = asScalaBuffer(vcfHeader.getGenotypeSamples) @@ -198,12 +202,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log (sd, samples, headerLines(vcfHeader)) } - headerToMetadata(readVcfHeader(filePath)) + headerToMetadata(readVcfHeader(pathName)) } - private def readVcfHeader(filePath: String): VCFHeader = { + private def readVcfHeader(pathName: String): VCFHeader = { VCFHeaderReader.readHeaderFrom(WrapSeekable.openPath(sc.hadoopConfiguration, - new Path(filePath))) + new Path(pathName))) } private def cleanAndMixInSupportedLines( @@ -273,67 +277,67 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log header.getOtherHeaderLines).toSeq } - private def loadHeaderLines(filePath: String): Seq[VCFHeaderLine] = { - getFsAndFilesWithFilter(filePath, new FileFilter("_header")) + private def loadHeaderLines(pathName: String): Seq[VCFHeaderLine] = { + getFsAndFilesWithFilter(pathName, new FileFilter("_header")) .map(p => headerLines(readVcfHeader(p.toString))) .flatten .distinct } /** - * @param filePath The (possibly globbed) filepath to load Avro sequence - * dictionary info from. - * @return Returns the SequenceDictionary representing said reference build. + * @param pathName The path name to load Avro sequence dictionaries from. + * Globs/directories are supported. + * @return Returns a SequenceDictionary. */ - private[rdd] def loadAvroSequences(filePath: String): SequenceDictionary = { - getFsAndFilesWithFilter(filePath, new FileFilter("_seqdict.avro")) - .map(p => loadAvroSequencesFile(p.toString)) + private[rdd] def loadAvroSequenceDictionary(pathName: String): SequenceDictionary = { + getFsAndFilesWithFilter(pathName, new FileFilter("_seqdict.avro")) + .map(p => loadSingleAvroSequenceDictionary(p.toString)) .reduce(_ ++ _) } /** - * @param filePath The filepath to load a single Avro file of sequence - * dictionary info from. - * @return Returns the SequenceDictionary representing said reference build. + * @see loadAvroSequenceDictionary * - * @see loadAvroSequences + * @param pathName The path name to load a single Avro sequence dictionary from. + * Globs/directories are not supported. + * @return Returns a SequenceDictionary. */ - private def loadAvroSequencesFile(filePath: String): SequenceDictionary = { - val avroSd = loadAvro[Contig](filePath, Contig.SCHEMA$) + private def loadSingleAvroSequenceDictionary(pathName: String): SequenceDictionary = { + val avroSd = loadAvro[Contig](pathName, Contig.SCHEMA$) SequenceDictionary.fromAvro(avroSd) } /** - * @param filePath The (possibly globbed) filepath to load Avro sample - * metadata descriptions from. - * @return Returns a Seq of Sample descriptions. + * @param pathName The path name to load Avro samples from. + * Globs/directories are supported. + * @return Returns a Seq of Samples. */ - private[rdd] def loadAvroSampleMetadata(filePath: String): Seq[Sample] = { - getFsAndFilesWithFilter(filePath, new FileFilter("_samples.avro")) + private[rdd] def loadAvroSamples(pathName: String): Seq[Sample] = { + getFsAndFilesWithFilter(pathName, new FileFilter("_samples.avro")) .map(p => loadAvro[Sample](p.toString, Sample.SCHEMA$)) .reduce(_ ++ _) } /** - * @param filePath The (possibly globbed) filepath to load Avro read group - * metadata descriptions from. + * @param pathName The path name to load Avro record group dictionaries from. + * Globs/directories are supported. * @return Returns a RecordGroupDictionary. */ - private[rdd] def loadAvroReadGroupMetadata(filePath: String): RecordGroupDictionary = { - getFsAndFilesWithFilter(filePath, new FileFilter("_rgdict.avro")) - .map(p => loadAvroReadGroupMetadataFile(p.toString)) + private[rdd] def loadAvroRecordGroupDictionary(pathName: String): RecordGroupDictionary = { + getFsAndFilesWithFilter(pathName, new FileFilter("_rgdict.avro")) + .map(p => loadSingleAvroRecordGroupDictionary(p.toString)) .reduce(_ ++ _) } /** - * @param filePath The filepath to load a single Avro file containing read - * group metadata. - * @return Returns a RecordGroupDictionary. + * @see loadAvroRecordGroupDictionary * - * @see loadAvroReadGroupMetadata + * @param pathName The path name to load a single Avro record group dictionary from. + * Globs/directories are not supported. + * @return Returns a RecordGroupDictionary. */ - private def loadAvroReadGroupMetadataFile(filePath: String): RecordGroupDictionary = { - val avroRgd = loadAvro[RecordGroupMetadata](filePath, + private def loadSingleAvroRecordGroupDictionary(pathName: String): RecordGroupDictionary = { + val avroRgd = loadAvro[RecordGroupMetadata](pathName, RecordGroupMetadata.SCHEMA$) // convert avro to record group dictionary @@ -341,40 +345,44 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * This method will create a new RDD. - * - * @param filePath The path to the input data - * @param predicate An optional pushdown predicate to use when reading the data - * @param projection An option projection schema to use when reading the data - * @tparam T The type of records to return - * @return An RDD with records of the specified type + * Load a path name in Parquet + Avro format into an RDD. + * + * @param pathName The path name to load Parquet + Avro formatted data from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @tparam T The type of records to return. + * @return An RDD with records of the specified type. */ def loadParquet[T]( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None)(implicit ev1: T => SpecificRecord, ev2: Manifest[T]): RDD[T] = { + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None)(implicit ev1: T => SpecificRecord, ev2: Manifest[T]): RDD[T] = { + //make sure a type was specified //not using require as to make the message clearer if (manifest[T] == manifest[scala.Nothing]) throw new IllegalArgumentException("Type inference failed; when loading please specify a specific type. " + "e.g.:\nval reads: RDD[AlignmentRecord] = ...\nbut not\nval reads = ...\nwithout a return type") - log.info("Reading the ADAM file at %s to create RDD".format(filePath)) + log.info("Reading the ADAM file at %s to create RDD".format(pathName)) val job = HadoopUtil.newJob(sc) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[T]]) - predicate.foreach { (pred) => + optPredicate.foreach { (pred) => log.info("Using the specified push-down predicate") ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred) } - if (projection.isDefined) { + if (optProjection.isDefined) { log.info("Using the specified projection schema") - AvroParquetInputFormat.setRequestedProjection(job, projection.get) + AvroParquetInputFormat.setRequestedProjection(job, optProjection.get) } val records = sc.newAPIHadoopFile( - filePath, + pathName, classOf[ParquetInputFormat[T]], classOf[Void], manifest[T].runtimeClass.asInstanceOf[Class[T]], @@ -384,7 +392,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val instrumented = if (Metrics.isRecording) records.instrument() else records val mapped = instrumented.map(p => p._2) - if (predicate.isDefined) { + if (optPredicate.isDefined) { // Strip the nulls that the predicate returns mapped.filter(p => p != null.asInstanceOf[T]) } else { @@ -395,12 +403,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log /** * Elaborates out a directory/glob/plain path. * + * @see getFsAndFiles + * * @param path Path to elaborate. * @param fs The underlying file system that this path is on. * @return Returns an array of Paths to load. - * - * @see getFsAndFiles - * * @throws FileNotFoundException if the path does not match any files. */ protected def getFiles(path: Path, fs: FileSystem): Array[Path] = { @@ -425,11 +432,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log /** * Elaborates out a directory/glob/plain path. * - * @param path Path to elaborate. - * @return Returns an array of Paths to load. - * * @see getFiles * + * @param path Path to elaborate. + * @return Returns an array of Paths to load. * @throws FileNotFoundException if the path does not match any files. */ protected def getFsAndFiles(path: Path): Array[Path] = { @@ -444,19 +450,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Elaborates out a directory/glob/plain path. - * - * @param filename Path to elaborate. - * @param filter Filter to discard paths. - * @return Returns an array of Paths to load. + * Elaborates out a directory/glob/plain path name. * * @see getFiles * + * @param pathName Path name to elaborate. + * @param filter Filter to discard paths. + * @return Returns an array of Paths to load. * @throws FileNotFoundException if the path does not match any files. */ - protected def getFsAndFilesWithFilter(filename: String, filter: PathFilter): Array[Path] = { + protected def getFsAndFilesWithFilter(pathName: String, filter: PathFilter): Array[Path] = { - val path = new Path(filename) + val path = new Path(pathName) // get the underlying fs for the file val fs = Option(path.getFileSystem(sc.hadoopConfiguration)).getOrElse( @@ -483,29 +488,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Checks to see if a set of SAM/BAM/CRAM files are queryname sorted. + * Checks to see if a set of BAM/CRAM/SAM files are queryname sorted. * - * If we are loading fragments and the SAM/BAM/CRAM files are sorted by the + * If we are loading fragments and the BAM/CRAM/SAM files are sorted by the * read names, this implies that all of the reads in a pair are consecutive in * the file. If this is the case, we can configure Hadoop-BAM to keep all of * the reads from a fragment in a single split. This allows us to eliminate * an expensive groupBy when loading a BAM file as fragments. * - * @param filePath The file path to load reads from. Globs/directories are - * supported. - * @param stringency The validation stringency to use when reading the header. - * @return Returns true if all files described by the filepath are queryname + * @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from. + * Globs/directories are supported. + * @param stringency The validation stringency to use when validating the + * BAM/CRAM/SAM format header. Defaults to ValidationStringency.STRICT. + * @return Returns true if all files described by the path name are queryname * sorted. */ - private[rdd] def filesAreQuerynameSorted(filePath: String, - stringency: ValidationStringency = ValidationStringency.STRICT): Boolean = { - val path = new Path(filePath) + private[rdd] def filesAreQuerynameSorted( + pathName: String, + stringency: ValidationStringency = ValidationStringency.STRICT): Boolean = { + val path = new Path(pathName) val bamFiles = getFsAndFiles(path) val filteredFiles = bamFiles.filter(p => { val pPath = p.getName() - pPath.endsWith(".bam") || pPath.endsWith(".cram") || - pPath.endsWith(".sam") || pPath.startsWith("part-") + isBamExt(pPath) || pPath.startsWith("part-") }) filteredFiles @@ -528,28 +534,47 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads a SAM/BAM file. + * Trim the default compression extension from the specified path name, if it is + * recognized as compressed by the compression codecs in the Hadoop configuration. * - * This reads the sequence and record group dictionaries from the SAM/BAM file + * @param pathName The path name to trim. + * @return The path name with the default compression extension trimmed. + */ + private[rdd] def trimExtensionIfCompressed(pathName: String): String = { + val codecFactory = new CompressionCodecFactory(sc.hadoopConfiguration) + val path = new Path(pathName) + val codec = codecFactory.getCodec(path) + if (codec == null) { + pathName + } else { + log.info(s"Found compression codec $codec for $pathName in Hadoop configuration.") + val extension = codec.getDefaultExtension() + CompressionCodecFactory.removeSuffix(pathName, extension) + } + } + + /** + * Load alignment records from BAM/CRAM/SAM into an AlignmentRecordRDD. + * + * This reads the sequence and record group dictionaries from the BAM/CRAM/SAM file * header. SAMRecords are read from the file and converted to the * AlignmentRecord schema. * - * @param filePath Path to the file on disk. - * @return Returns an AlignmentRecordRDD which wraps the RDD of reads, - * sequence dictionary representing the contigs these reads are aligned to - * if the reads are aligned, and the record group dictionary for the reads - * if one is available. - * @see loadAlignments + * @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from. + * Globs/directories are supported. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ - def loadBam(filePath: String, - validationStringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = { - val path = new Path(filePath) + def loadBam( + pathName: String, + validationStringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadBam.time { + val path = new Path(pathName) val bamFiles = getFsAndFiles(path) val filteredFiles = bamFiles.filter(p => { val pPath = p.getName() - pPath.endsWith(".bam") || pPath.endsWith(".cram") || - pPath.endsWith(".sam") || pPath.startsWith("part-") + isBamExt(pPath) || pPath.startsWith("part-") }) require(filteredFiles.nonEmpty, @@ -589,13 +614,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to // just load from a single newAPIHadoopFile call instead of a union across // files, so we do that whenever possible - val records = if (filteredFiles.size != bamFiles.size) { + val records = if (filteredFiles.length != bamFiles.length) { sc.union(filteredFiles.map(p => { sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable], classOf[SAMRecordWritable], ContextUtil.getConfiguration(job)) })) } else { - sc.newAPIHadoopFile(filePath, classOf[AnySAMInputFormat], classOf[LongWritable], + sc.newAPIHadoopFile(pathName, classOf[AnySAMInputFormat], classOf[LongWritable], classOf[SAMRecordWritable], ContextUtil.getConfiguration(job)) } if (Metrics.isRecording) records.instrument() else records @@ -607,27 +632,39 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Functions like loadBam, but uses bam index files to look at fewer blocks, - * and only returns records within a specified ReferenceRegion. Bam index file required. - * - * @param filePath The path to the input data. Currently this path must correspond to - * a single Bam file. The bam index file associated needs to have the same name. - * @param viewRegion The ReferenceRegion we are filtering on + * Functions like loadBam, but uses BAM index files to look at fewer blocks, + * and only returns records within a specified ReferenceRegion. BAM index file required. + * + * @param pathName The path name to load indexed BAM formatted alignment records from. + * Globs/directories are supported. + * @param viewRegion The ReferenceRegion we are filtering on. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ - def loadIndexedBam(filePath: String, viewRegion: ReferenceRegion): AlignmentRecordRDD = { - loadIndexedBam(filePath, Iterable(viewRegion)) + def loadIndexedBam( + pathName: String, + viewRegion: ReferenceRegion): AlignmentRecordRDD = { + loadIndexedBam(pathName, Iterable(viewRegion)) } /** - * Functions like loadBam, but uses bam index files to look at fewer blocks, - * and only returns records within the specified ReferenceRegions. Bam index file required. - * - * @param filePath The path to the input data. Currently this path must correspond to - * a single Bam file. The bam index file associated needs to have the same name. - * @param viewRegions Iterable of ReferenceRegions we are filtering on + * Functions like loadBam, but uses BAM index files to look at fewer blocks, + * and only returns records within the specified ReferenceRegions. BAM index file required. + * + * @param pathName The path name to load indexed BAM formatted alignment records from. + * Globs/directories are supported. + * @param viewRegions Iterable of ReferenceRegion we are filtering on. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ - def loadIndexedBam(filePath: String, viewRegions: Iterable[ReferenceRegion])(implicit s: DummyImplicit): AlignmentRecordRDD = { - val path = new Path(filePath) + def loadIndexedBam( + pathName: String, + viewRegions: Iterable[ReferenceRegion])(implicit s: DummyImplicit): AlignmentRecordRDD = LoadIndexedBam.time { + + val path = new Path(pathName) + // todo: can this method handle SAM and CRAM, or just BAM? val bamFiles = getFsAndFiles(path).filter(p => p.toString.endsWith(".bam")) require(bamFiles.nonEmpty, @@ -664,7 +701,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads Avro data from a Hadoop File System. + * Load Avro data from a Hadoop File System. * * This method uses the SparkContext wrapped by this class to identify our * underlying file system. We then use the underlying FileSystem imp'l to @@ -676,16 +713,17 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * As such, we must force the user to pass in the schema. * * @tparam T The type of the specific record we are loading. - * @param filename Path to Vf file from. + * @param pathName The path name to load Avro records from. + * Globs/directories are supported. * @param schema Schema of records we are loading. - * @return Returns a Seq containing the avro records. + * @return Returns a Seq containing the Avro records. */ - private def loadAvro[T <: SpecificRecordBase](filename: String, - schema: Schema)( - implicit tTag: ClassTag[T]): Seq[T] = { + private def loadAvro[T <: SpecificRecordBase]( + pathName: String, + schema: Schema)(implicit tTag: ClassTag[T]): Seq[T] = { // get our current file system - val path = new Path(filename) + val path = new Path(pathName) val fs = path.getFileSystem(sc.hadoopConfiguration) // get an input stream @@ -740,54 +778,57 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads alignment data from a Parquet file. - * - * @param filePath The path of the file to load. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional schema designating the fields to project. - * @return Returns an AlignmentRecordRDD which wraps the RDD of reads, - * sequence dictionary representing the contigs these reads are aligned to - * if the reads are aligned, and the record group dictionary for the reads - * if one is available. - * @note The sequence dictionary is read from an avro file stored at - * filePath/_seqdict.avro and the record group dictionary is read from an - * avro file stored at filePath/_rgdict.avro. These files are pure avro, - * not Parquet. - * @see loadAlignments + * Load a path name in Parquet + Avro format into an AlignmentRecordRDD. + * + * @note The sequence dictionary is read from an Avro file stored at + * pathName/_seqdict.avro and the record group dictionary is read from an + * Avro file stored at pathName/_rgdict.avro. These files are pure Avro, + * not Parquet + Avro. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ def loadParquetAlignments( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): AlignmentRecordRDD = { + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): AlignmentRecordRDD = { // load from disk - val rdd = loadParquet[AlignmentRecord](filePath, predicate, projection) + val rdd = loadParquet[AlignmentRecord](pathName, optPredicate, optProjection) // convert avro to sequence dictionary - val sd = loadAvroSequences(filePath) + val sd = loadAvroSequenceDictionary(pathName) // convert avro to sequence dictionary - val rgd = loadAvroReadGroupMetadata(filePath) + val rgd = loadAvroRecordGroupDictionary(pathName) AlignmentRecordRDD(rdd, sd, rgd) } /** - * Loads reads from interleaved FASTQ. + * Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD. * * In interleaved FASTQ, the two reads from a paired sequencing protocol are * interleaved in a single file. This is a zipped representation of the * typical paired FASTQ. * - * @param filePath Path to load. - * @return Returns the file as an unaligned AlignmentRecordRDD. + * @param pathName The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @return Returns an unaligned AlignmentRecordRDD. */ def loadInterleavedFastq( - filePath: String): AlignmentRecordRDD = { + pathName: String): AlignmentRecordRDD = LoadInterleavedFastq.time { val job = HadoopUtil.newJob(sc) val records = sc.newAPIHadoopFile( - filePath, + pathName, classOf[InterleavedFastqInputFormat], classOf[Void], classOf[Text], @@ -801,60 +842,70 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads (possibly paired) FASTQ data. + * Load unaligned alignment records from (possibly paired) FASTQ into an AlignmentRecordRDD. * * @see loadPairedFastq * @see loadUnpairedFastq * - * @param filePath1 The path where the first set of reads are. - * @param filePath2Opt The path where the second set of reads are, if provided. - * @param recordGroupOpt The optional record group name to associate to the - * reads. - * @param stringency The validation stringency to use when validating the reads. - * @return Returns the reads as an unaligned AlignmentRecordRDD. + * @param pathName1 The path name to load the first set of unaligned alignment records from. + * Globs/directories are supported. + * @param optPathName2 The path name to load the second set of unaligned alignment records from, + * if provided. Globs/directories are supported. + * @param optRecordGroup The optional record group name to associate to the unaligned alignment + * records. Defaults to None. + * @param stringency The validation stringency to use when validating (possibly paired) FASTQ format. + * Defaults to ValidationStringency.STRICT. + * @return Returns an unaligned AlignmentRecordRDD. */ def loadFastq( - filePath1: String, - filePath2Opt: Option[String], - recordGroupOpt: Option[String] = None, - stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = { - filePath2Opt.fold({ - loadUnpairedFastq(filePath1, - recordGroupOpt, + pathName1: String, + optPathName2: Option[String], + optRecordGroup: Option[String] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadFastq.time { + + optPathName2.fold({ + loadUnpairedFastq(pathName1, + optRecordGroup = optRecordGroup, stringency = stringency) })(filePath2 => { - loadPairedFastq(filePath1, + loadPairedFastq(pathName1, filePath2, - recordGroupOpt, - stringency) + optRecordGroup = optRecordGroup, + stringency = stringency) }) } /** - * Loads paired FASTQ data from two files. - * - * @see loadFastq - * - * @param filePath1 The path where the first set of reads are. - * @param filePath2 The path where the second set of reads are. - * @param recordGroupOpt The optional record group name to associate to the - * reads. - * @param stringency The validation stringency to use when validating the reads. - * @return Returns the reads as an unaligned AlignmentRecordRDD. + * Load unaligned alignment records from paired FASTQ into an AlignmentRecordRDD. + * + * @param pathName1 The path name to load the first set of unaligned alignment records from. + * Globs/directories are supported. + * @param pathName2 The path name to load the second set of unaligned alignment records from. + * Globs/directories are supported. + * @param optRecordGroup The optional record group name to associate to the unaligned alignment + * records. Defaults to None. + * @param stringency The validation stringency to use when validating paired FASTQ format. + * Defaults to ValidationStringency.STRICT. + * @return Returns an unaligned AlignmentRecordRDD. */ def loadPairedFastq( - filePath1: String, - filePath2: String, - recordGroupOpt: Option[String], - stringency: ValidationStringency): AlignmentRecordRDD = { - val reads1 = loadUnpairedFastq(filePath1, - recordGroupOpt, + pathName1: String, + pathName2: String, + optRecordGroup: Option[String] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadPairedFastq.time { + + val reads1 = loadUnpairedFastq( + pathName1, setFirstOfPair = true, - stringency = stringency) - val reads2 = loadUnpairedFastq(filePath2, - recordGroupOpt, + optRecordGroup = optRecordGroup, + stringency = stringency + ) + val reads2 = loadUnpairedFastq( + pathName2, setSecondOfPair = true, - stringency = stringency) + optRecordGroup = optRecordGroup, + stringency = stringency + ) stringency match { case ValidationStringency.STRICT | ValidationStringency.LENIENT => @@ -862,7 +913,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val count2 = reads2.rdd.cache.count if (count1 != count2) { - val msg = s"Fastq 1 ($filePath1) has $count1 reads, fastq 2 ($filePath2) has $count2 reads" + val msg = s"Fastq 1 ($pathName1) has $count1 reads, fastq 2 ($pathName2) has $count2 reads" if (stringency == ValidationStringency.STRICT) throw new IllegalArgumentException(msg) else { @@ -877,28 +928,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads unpaired FASTQ data from two files. - * - * @see loadFastq - * - * @param filePath The path where the first set of reads are. - * @param recordGroupOpt The optional record group name to associate to the - * reads. - * @param setFirstOfPair If true, sets the read as first from the fragment. - * @param setSecondOfPair If true, sets the read as second from the fragment. - * @param stringency The validation stringency to use when validating the reads. - * @return Returns the reads as an unaligned AlignmentRecordRDD. + * Load unaligned alignment records from unpaired FASTQ into an AlignmentRecordRDD. + * + * @param pathName The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @param setFirstOfPair If true, sets the unaligned alignment record as first from the fragment. + * Defaults to false. + * @param setSecondOfPair If true, sets the unaligned alignment record as second from the fragment. + * Defaults to false. + * @param optRecordGroup The optional record group name to associate to the unaligned alignment + * records. Defaults to None. + * @param stringency The validation stringency to use when validating unpaired FASTQ format. + * Defaults to ValidationStringency.STRICT. + * @return Returns an unaligned AlignmentRecordRDD. */ def loadUnpairedFastq( - filePath: String, - recordGroupOpt: Option[String] = None, + pathName: String, setFirstOfPair: Boolean = false, setSecondOfPair: Boolean = false, - stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = { + optRecordGroup: Option[String] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadUnpairedFastq.time { val job = HadoopUtil.newJob(sc) val records = sc.newAPIHadoopFile( - filePath, + pathName, classOf[SingleFastqInputFormat], classOf[Void], classOf[Text], @@ -911,9 +964,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log AlignmentRecordRDD.unaligned(records.map( fastqRecordConverter.convertRead( _, - recordGroupOpt.map(recordGroup => + optRecordGroup.map(recordGroup => if (recordGroup.isEmpty) - filePath.substring(filePath.lastIndexOf("/") + 1) + pathName.substring(pathName.lastIndexOf("/") + 1) else recordGroup), setFirstOfPair, @@ -924,12 +977,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * @param filePath File to read VCF records from. - * @param viewRegions Optional intervals to push down into file using index. + * @param pathName The path name to load VCF variant context records from. + * Globs/directories are supported. + * @param optViewRegions Optional intervals to push down into file using index. * @return Returns a raw RDD of (LongWritable, VariantContextWritable)s. */ - private def readVcfRecords(filePath: String, - viewRegions: Option[Iterable[ReferenceRegion]]): RDD[(LongWritable, VariantContextWritable)] = { + private def readVcfRecords( + pathName: String, + optViewRegions: Option[Iterable[ReferenceRegion]]): RDD[(LongWritable, VariantContextWritable)] = { + // load vcf data val job = HadoopUtil.newJob(sc) job.getConfiguration().setStrings("io.compression.codecs", @@ -937,39 +993,39 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log classOf[BGZFEnhancedGzipCodec].getCanonicalName()) val conf = ContextUtil.getConfiguration(job) - viewRegions.foreach(vr => { + optViewRegions.foreach(vr => { val intervals = vr.toList.map(r => LocatableReferenceRegion(r)) VCFInputFormat.setIntervals(conf, intervals) }) sc.newAPIHadoopFile( - filePath, + pathName, classOf[VCFInputFormat], classOf[LongWritable], classOf[VariantContextWritable], conf ) } /** - * Loads a VCF file into an RDD. + * Load variant context records from VCF into a VariantContextRDD. * - * @param filePath The file to load. - * @param stringency The validation stringency to use when validating the VCF. + * @param pathName The path name to load VCF variant context records from. + * Globs/directories are supported. + * @param stringency The validation stringency to use when validating VCF format. + * Defaults to ValidationStringency.STRICT. * @return Returns a VariantContextRDD. - * - * @see loadVcfAnnotations */ def loadVcf( - filePath: String, - stringency: ValidationStringency = ValidationStringency.STRICT): VariantContextRDD = { + pathName: String, + stringency: ValidationStringency = ValidationStringency.STRICT): VariantContextRDD = LoadVcf.time { // load records from VCF - val records = readVcfRecords(filePath, None) + val records = readVcfRecords(pathName, None) // attach instrumentation if (Metrics.isRecording) records.instrument() else records // load vcf metadata - val (sd, samples, headers) = loadVcfMetadata(filePath) + val (sd, samples, headers) = loadVcfMetadata(pathName) val vcc = new VariantContextConverter(headers, stringency) VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)), @@ -979,38 +1035,43 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads a VCF file indexed by a tabix (tbi) file into an RDD. + * Load variant context records from VCF indexed by tabix (tbi) into a VariantContextRDD. * - * @param filePath The file to load. - * @param viewRegion ReferenceRegions we are filtering on. + * @param pathName The path name to load VCF variant context records from. + * Globs/directories are supported. + * @param viewRegion ReferenceRegion we are filtering on. * @return Returns a VariantContextRDD. */ + // todo: add stringency with default if possible def loadIndexedVcf( - filePath: String, - viewRegion: ReferenceRegion): VariantContextRDD = - loadIndexedVcf(filePath, Iterable(viewRegion)) + pathName: String, + viewRegion: ReferenceRegion): VariantContextRDD = { + loadIndexedVcf(pathName, Iterable(viewRegion)) + } /** - * Loads a VCF file indexed by a tabix (tbi) file into an RDD. + * Load variant context records from VCF indexed by tabix (tbi) into a VariantContextRDD. * - * @param filePath The file to load. + * @param pathName The path name to load VCF variant context records from. + * Globs/directories are supported. * @param viewRegions Iterator of ReferenceRegions we are filtering on. - * @param stringency The validation stringency to use when validating the VCF. + * @param stringency The validation stringency to use when validating VCF format. + * Defaults to ValidationStringency.STRICT. * @return Returns a VariantContextRDD. */ def loadIndexedVcf( - filePath: String, + pathName: String, viewRegions: Iterable[ReferenceRegion], - stringency: ValidationStringency = ValidationStringency.STRICT)(implicit s: DummyImplicit): VariantContextRDD = { + stringency: ValidationStringency = ValidationStringency.STRICT)(implicit s: DummyImplicit): VariantContextRDD = LoadIndexedVcf.time { // load records from VCF - val records = readVcfRecords(filePath, Some(viewRegions)) + val records = readVcfRecords(pathName, Some(viewRegions)) // attach instrumentation if (Metrics.isRecording) records.instrument() else records // load vcf metadata - val (sd, samples, headers) = loadVcfMetadata(filePath) + val (sd, samples, headers) = loadVcfMetadata(pathName) val vcc = new VariantContextConverter(headers, stringency) VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)), @@ -1020,65 +1081,75 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads Genotypes stored in Parquet with accompanying metadata. - * - * @param filePath The path to load files from. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional projection to use for reading. + * Load a path name in Parquet + Avro format into a GenotypeRDD. + * + * @param pathName The path name to load genotypes from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. * @return Returns a GenotypeRDD. */ def loadParquetGenotypes( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): GenotypeRDD = { - val rdd = loadParquet[Genotype](filePath, predicate, projection) + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): GenotypeRDD = { + + val rdd = loadParquet[Genotype](pathName, optPredicate, optProjection) // load header lines - val headers = loadHeaderLines(filePath) + val headers = loadHeaderLines(pathName) // load sequence info - val sd = loadAvroSequences(filePath) + val sd = loadAvroSequenceDictionary(pathName) // load avro record group dictionary and convert to samples - val samples = loadAvroSampleMetadata(filePath) + val samples = loadAvroSamples(pathName) GenotypeRDD(rdd, sd, samples, headers) } /** - * Loads Variants stored in Parquet with accompanying metadata. - * - * @param filePath The path to load files from. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional projection to use for reading. + * Load a path name in Parquet + Avro format into a VariantRDD. + * + * @param pathName The path name to load variants from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. * @return Returns a VariantRDD. */ def loadParquetVariants( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): VariantRDD = { - val rdd = loadParquet[Variant](filePath, predicate, projection) - val sd = loadAvroSequences(filePath) + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): VariantRDD = { + + val rdd = loadParquet[Variant](pathName, optPredicate, optProjection) + val sd = loadAvroSequenceDictionary(pathName) // load header lines - val headers = loadHeaderLines(filePath) + val headers = loadHeaderLines(pathName) VariantRDD(rdd, sd, headers) } /** - * Loads a FASTA file. + * Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD. * - * @param filePath The path to load from. - * @param fragmentLength The length to split contigs into. This sets the - * parallelism achievable. - * @return Returns a NucleotideContigFragmentRDD containing the contigs. + * @param pathName The path name to load nucleotide contig fragments from. + * Globs/directories are supported. + * @param maximumFragmentLength Maximum fragment length. Defaults to 10000L. Values greater + * than 1e9 should be avoided. + * @return Returns a NucleotideContigFragmentRDD. */ def loadFasta( - filePath: String, - fragmentLength: Long): NucleotideContigFragmentRDD = { + pathName: String, + maximumFragmentLength: Long = 10000L): NucleotideContigFragmentRDD = LoadFasta.time { + val fastaData: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile( - filePath, + pathName, classOf[TextInputFormat], classOf[LongWritable], classOf[Text] @@ -1088,28 +1159,34 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val remapData = fastaData.map(kv => (kv._1.get, kv._2.toString)) // convert rdd and cache - val fragmentRdd = FastaConverter(remapData, fragmentLength) + val fragmentRdd = FastaConverter(remapData, maximumFragmentLength) .cache() NucleotideContigFragmentRDD(fragmentRdd) } /** - * Loads interleaved FASTQ data as Fragments. + * Load paired unaligned alignment records grouped by sequencing fragment + * from interleaved FASTQ into an FragmentRDD. + * + * In interleaved FASTQ, the two reads from a paired sequencing protocol are + * interleaved in a single file. This is a zipped representation of the + * typical paired FASTQ. * * Fragments represent all of the reads from a single sequenced fragment as * a single object, which is a useful representation for some tasks. * - * @param filePath The path to load. + * @param pathName The path name to load unaligned alignment records from. + * Globs/directories are supported. * @return Returns a FragmentRDD containing the paired reads grouped by * sequencing fragment. */ def loadInterleavedFastqAsFragments( - filePath: String): FragmentRDD = { + pathName: String): FragmentRDD = LoadInterleavedFastqFragments.time { val job = HadoopUtil.newJob(sc) val records = sc.newAPIHadoopFile( - filePath, + pathName, classOf[InterleavedFastqInputFormat], classOf[Void], classOf[Text], @@ -1123,136 +1200,196 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads file of Features to a CoverageRDD. - * Coverage is stored in the score attribute of Feature. + * Load features into a FeatureRDD and convert to a CoverageRDD. + * Coverage is stored in the score field of Feature. + * + * Loads path names ending in: + * * .bed as BED6/12 format, + * * .gff3 as GFF3 format, + * * .gtf/.gff as GTF/GFF2 format, + * * .narrow[pP]eak as NarrowPeak format, and + * * .interval_list as IntervalList format. + * + * If none of these match, fall back to Parquet + Avro. + * + * For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files + * are supported through compression codecs configured in Hadoop, which by default include + * .gz and .bz2, but can include more. * - * @param filePath File path to load coverage from. - * @return CoverageRDD containing an RDD of Coverage + * @see loadBed + * @see loadGtf + * @see loadGff3 + * @see loadNarrowPeak + * @see loadIntervalList + * @see loadParquetFeatures + * + * @param pathName The path name to load features from. + * Globs/directories are supported, although file extension must be present + * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. + * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. + * Defaults to StorageLevel.MEMORY_ONLY. + * @param optMinPartitions An optional minimum number of partitions to use. For + * textual formats, if this is None, fall back to the Spark default + * parallelism. Defaults to None. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @param stringency The validation stringency to use when validating BED6/12, GFF3, + * GTF/GFF2, NarrowPeak, or IntervalList formats. Defaults to ValidationStringency.STRICT. + * @return Returns a FeatureRDD converted to a CoverageRDD. */ - def loadCoverage(filePath: String): CoverageRDD = loadFeatures(filePath).toCoverage + def loadCoverage( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): CoverageRDD = LoadCoverage.time { + + loadFeatures(pathName, + optStorageLevel = optStorageLevel, + optMinPartitions = optMinPartitions, + optPredicate = optPredicate, + optProjection = optProjection, + stringency = stringency).toCoverage + } /** - * Loads Parquet file of Features to a CoverageRDD. - * Coverage is stored in the score attribute of Feature. - * - * @param filePath File path to load coverage from. - * @param predicate An optional predicate to push down into the file. - * @return CoverageRDD containing an RDD of Coverage + * Load a path name in Parquet + Avro format into a FeatureRDD and convert to a CoverageRDD. + * Coverage is stored in the score field of Feature. + * + * @param pathName The path name to load features from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @return Returns a FeatureRDD converted to a CoverageRDD. */ - def loadParquetCoverage(filePath: String, - predicate: Option[FilterPredicate] = None): CoverageRDD = { - val proj = Projection(FeatureField.contigName, FeatureField.start, FeatureField.end, FeatureField.score) - loadParquetFeatures(filePath, predicate = predicate, projection = Some(proj)).toCoverage + def loadParquetCoverage( + pathName: String, + optPredicate: Option[FilterPredicate] = None): CoverageRDD = { + + val coverageFields = Projection(FeatureField.contigName, FeatureField.start, FeatureField.end, FeatureField.score) + loadParquetFeatures(pathName, optPredicate = optPredicate, optProjection = Some(coverageFields)).toCoverage } /** - * Loads features stored in GFF3 format. + * Load a path name in GFF3 format into a FeatureRDD. * - * @param filePath The path to the file to load. + * @param pathName The path name to load features in GFF3 format from. + * Globs/directories are supported. * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. * Defaults to StorageLevel.MEMORY_ONLY. - * @param minPartitions An optional minimum number of partitions to load. If - * not set, falls back to the configured Spark default parallelism. - * @param stringency Optional stringency to pass. LENIENT stringency will warn - * when a malformed line is encountered, SILENT will ignore the malformed - * line, STRICT will throw an exception. + * @param optMinPartitions An optional minimum number of partitions to load. If + * not set, falls back to the configured Spark default parallelism. Defaults to None. + * @param stringency The validation stringency to use when validating GFF3 format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FeatureRDD. */ - def loadGff3(filePath: String, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), - minPartitions: Option[Int] = None, - stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { - val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) + def loadGff3( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGff3.time { + + val records = sc.textFile(pathName, optMinPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new GFF3Parser().parse(_, stringency)) if (Metrics.isRecording) records.instrument() else records FeatureRDD.inferSequenceDictionary(records, optStorageLevel = optStorageLevel) } /** - * Loads features stored in GFF2/GTF format. + * Load a path name in GTF/GFF2 format into a FeatureRDD. * - * @param filePath The path to the file to load. + * @param pathName The path name to load features in GTF/GFF2 format from. + * Globs/directories are supported. * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. * Defaults to StorageLevel.MEMORY_ONLY. - * @param minPartitions An optional minimum number of partitions to load. If - * not set, falls back to the configured Spark default parallelism. - * @param stringency Optional stringency to pass. LENIENT stringency will warn - * when a malformed line is encountered, SILENT will ignore the malformed - * line, STRICT will throw an exception. + * @param optMinPartitions An optional minimum number of partitions to load. If + * not set, falls back to the configured Spark default parallelism. Defaults to None. + * @param stringency The validation stringency to use when validating GTF/GFF2 format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FeatureRDD. */ - def loadGtf(filePath: String, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), - minPartitions: Option[Int] = None, - stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { - val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) + def loadGtf( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGtf.time { + + val records = sc.textFile(pathName, optMinPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new GTFParser().parse(_, stringency)) if (Metrics.isRecording) records.instrument() else records FeatureRDD.inferSequenceDictionary(records, optStorageLevel = optStorageLevel) } /** - * Loads features stored in BED6/12 format. + * Load a path name in BED6/12 format into a FeatureRDD. * - * @param filePath The path to the file to load. + * @param pathName The path name to load features in BED6/12 format from. + * Globs/directories are supported. * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. * Defaults to StorageLevel.MEMORY_ONLY. - * @param minPartitions An optional minimum number of partitions to load. If - * not set, falls back to the configured Spark default parallelism. - * @param stringency Optional stringency to pass. LENIENT stringency will warn - * when a malformed line is encountered, SILENT will ignore the malformed - * line, STRICT will throw an exception. + * @param optMinPartitions An optional minimum number of partitions to load. If + * not set, falls back to the configured Spark default parallelism. Defaults to None. + * @param stringency The validation stringency to use when validating BED6/12 format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FeatureRDD. */ - def loadBed(filePath: String, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), - minPartitions: Option[Int] = None, - stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { - val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) + def loadBed( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadBed.time { + + val records = sc.textFile(pathName, optMinPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new BEDParser().parse(_, stringency)) if (Metrics.isRecording) records.instrument() else records FeatureRDD.inferSequenceDictionary(records, optStorageLevel = optStorageLevel) } /** - * Loads features stored in NarrowPeak format. + * Load a path name in NarrowPeak format into a FeatureRDD. * - * @param filePath The path to the file to load. + * @param pathName The path name to load features in NarrowPeak format from. + * Globs/directories are supported. * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. * Defaults to StorageLevel.MEMORY_ONLY. - * @param minPartitions An optional minimum number of partitions to load. If - * not set, falls back to the configured Spark default parallelism. - * @param stringency Optional stringency to pass. LENIENT stringency will warn - * when a malformed line is encountered, SILENT will ignore the malformed - * line, STRICT will throw an exception. + * @param optMinPartitions An optional minimum number of partitions to load. If + * not set, falls back to the configured Spark default parallelism. Defaults to None. + * @param stringency The validation stringency to use when validating NarrowPeak format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FeatureRDD. */ - def loadNarrowPeak(filePath: String, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), - minPartitions: Option[Int] = None, - stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { - val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) + def loadNarrowPeak( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadNarrowPeak.time { + + val records = sc.textFile(pathName, optMinPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new NarrowPeakParser().parse(_, stringency)) if (Metrics.isRecording) records.instrument() else records FeatureRDD.inferSequenceDictionary(records, optStorageLevel = optStorageLevel) } /** - * Loads features stored in IntervalList format. - * - * @param filePath The path to the file to load. - * @param minPartitions An optional minimum number of partitions to load. If - * not set, falls back to the configured Spark default parallelism. - * @param stringency Optional stringency to pass. LENIENT stringency will warn - * when a malformed line is encountered, SILENT will ignore the malformed - * line, STRICT will throw an exception. + * Load a path name in IntervalList format into a FeatureRDD. + * + * @param pathName The path name to load features in IntervalList format from. + * Globs/directories are supported. + * @param optMinPartitions An optional minimum number of partitions to load. If + * not set, falls back to the configured Spark default parallelism. Defaults to None. + * @param stringency The validation stringency to use when validating IntervalList format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FeatureRDD. */ - def loadIntervalList(filePath: String, - minPartitions: Option[Int] = None, - stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { + def loadIntervalList( + pathName: String, + optMinPartitions: Option[Int] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadIntervalList.time { - val parsedLines = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) + val parsedLines = sc.textFile(pathName, optMinPartitions.getOrElse(sc.defaultParallelism)) .map(new IntervalListParser().parseWithHeader(_, stringency)) val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*), parsedLines.flatMap(_._2)) @@ -1262,79 +1399,90 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Loads Features stored in Parquet, with accompanying metadata. - * - * @param filePath The path to load files from. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional projection to use for reading. + * Load a path name in Parquet + Avro format into a FeatureRDD. + * + * @param pathName The path name to load features from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. * @return Returns a FeatureRDD. */ def loadParquetFeatures( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): FeatureRDD = { - val sd = loadAvroSequences(filePath) - val rdd = loadParquet[Feature](filePath, predicate, projection) + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): FeatureRDD = { + + val sd = loadAvroSequenceDictionary(pathName) + val rdd = loadParquet[Feature](pathName, optPredicate, optProjection) FeatureRDD(rdd, sd) } /** - * Loads NucleotideContigFragments stored in Parquet, with metadata. - * - * @param filePath The path to load files from. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional projection to use for reading. + * Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD. + * + * @param pathName The path name to load nucleotide contig fragments from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. * @return Returns a NucleotideContigFragmentRDD. */ def loadParquetContigFragments( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): NucleotideContigFragmentRDD = { - val sd = loadAvroSequences(filePath) - val rdd = loadParquet[NucleotideContigFragment](filePath, predicate, projection) + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): NucleotideContigFragmentRDD = { + + val sd = loadAvroSequenceDictionary(pathName) + val rdd = loadParquet[NucleotideContigFragment](pathName, optPredicate, optProjection) NucleotideContigFragmentRDD(rdd, sd) } /** - * Loads Fragments stored in Parquet, with accompanying metadata. - * - * @param filePath The path to load files from. - * @param predicate An optional predicate to push down into the file. - * @param projection An optional projection to use for reading. + * Load a path name in Parquet + Avro format into a FragmentRDD. + * + * @param pathName The path name to load fragments from. + * Globs/directories are supported. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. * @return Returns a FragmentRDD. */ def loadParquetFragments( - filePath: String, - predicate: Option[FilterPredicate] = None, - projection: Option[Schema] = None): FragmentRDD = { + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): FragmentRDD = { // convert avro to sequence dictionary - val sd = loadAvroSequences(filePath) + val sd = loadAvroSequenceDictionary(pathName) // convert avro to sequence dictionary - val rgd = loadAvroReadGroupMetadata(filePath) + val rgd = loadAvroRecordGroupDictionary(pathName) // load fragment data from parquet - val rdd = loadParquet[Fragment](filePath, predicate, projection) + val rdd = loadParquet[Fragment](pathName, optPredicate, optProjection) FragmentRDD(rdd, sd, rgd) } /** - * Loads Features from a file, autodetecting the file type. + * Load features into a FeatureRDD. * - * Loads files ending in .bed as BED6/12, .gff3 as GFF3, .gtf/.gff as - * GTF/GFF2, .narrow[pP]eak as NarrowPeak, and .interval_list as - * IntervalList. If none of these match, we fall back to Parquet. + * Loads path names ending in: + * * .bed as BED6/12 format, + * * .gff3 as GFF3 format, + * * .gtf/.gff as GTF/GFF2 format, + * * .narrow[pP]eak as NarrowPeak format, and + * * .interval_list as IntervalList format. * - * @param filePath The path to the file to load. - * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. - * Defaults to StorageLevel.MEMORY_ONLY. - * @param projection An optional projection to push down. - * @param minPartitions An optional minimum number of partitions to use. For - * textual formats, if this is None, we fall back to the Spark default - * parallelism. - * @return Returns a FeatureRDD. + * If none of these match, fall back to Parquet + Avro. + * + * For BED6/12, GFF3, GTF/GFF2, NarrowPeak, and IntervalList formats, compressed files + * are supported through compression codecs configured in Hadoop, which by default include + * .gz and .bz2, but can include more. * * @see loadBed * @see loadGtf @@ -1342,240 +1490,319 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @see loadNarrowPeak * @see loadIntervalList * @see loadParquetFeatures + * + * @param pathName The path name to load features from. + * Globs/directories are supported, although file extension must be present + * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. + * @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary. + * Defaults to StorageLevel.MEMORY_ONLY. + * @param optMinPartitions An optional minimum number of partitions to use. For + * textual formats, if this is None, fall back to the Spark default + * parallelism. Defaults to None. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @param stringency The validation stringency to use when validating BED6/12, GFF3, + * GTF/GFF2, NarrowPeak, or IntervalList formats. Defaults to ValidationStringency.STRICT. + * @return Returns a FeatureRDD. */ - def loadFeatures(filePath: String, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), - projection: Option[Schema] = None, - minPartitions: Option[Int] = None): FeatureRDD = LoadFeatures.time { - - if (filePath.endsWith(".bed")) { - log.info(s"Loading $filePath as BED and converting to features. Projection is ignored.") - loadBed(filePath, optStorageLevel = optStorageLevel, minPartitions = minPartitions) - } else if (filePath.endsWith(".gff3")) { - log.info(s"Loading $filePath as GFF3 and converting to features. Projection is ignored.") - loadGff3(filePath, optStorageLevel = optStorageLevel, minPartitions = minPartitions) - } else if (filePath.endsWith(".gtf") || - filePath.endsWith(".gff")) { - log.info(s"Loading $filePath as GTF/GFF2 and converting to features. Projection is ignored.") - loadGtf(filePath, optStorageLevel = optStorageLevel, minPartitions = minPartitions) - } else if (filePath.endsWith(".narrowPeak") || - filePath.endsWith(".narrowpeak")) { - log.info(s"Loading $filePath as NarrowPeak and converting to features. Projection is ignored.") - loadNarrowPeak(filePath, optStorageLevel = optStorageLevel, minPartitions = minPartitions) - } else if (filePath.endsWith(".interval_list")) { - log.info(s"Loading $filePath as IntervalList and converting to features. Projection is ignored.") - loadIntervalList(filePath, minPartitions = minPartitions) + def loadFeatures( + pathName: String, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), + optMinPartitions: Option[Int] = None, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadFeatures.time { + + val trimmedPathName = trimExtensionIfCompressed(pathName) + if (isBedExt(trimmedPathName)) { + log.info(s"Loading $pathName as BED and converting to Features.") + loadBed(pathName, + optStorageLevel = optStorageLevel, + optMinPartitions = optMinPartitions, + stringency = stringency) + } else if (isGff3Ext(trimmedPathName)) { + log.info(s"Loading $pathName as GFF3 and converting to Features.") + loadGff3(pathName, + optStorageLevel = optStorageLevel, + optMinPartitions = optMinPartitions, + stringency = stringency) + } else if (isGtfExt(trimmedPathName)) { + log.info(s"Loading $pathName as GTF/GFF2 and converting to Features.") + loadGtf(pathName, + optStorageLevel = optStorageLevel, + optMinPartitions = optMinPartitions, + stringency = stringency) + } else if (isNarrowPeakExt(trimmedPathName)) { + log.info(s"Loading $pathName as NarrowPeak and converting to Features.") + loadNarrowPeak(pathName, + optStorageLevel = optStorageLevel, + optMinPartitions = optMinPartitions, + stringency = stringency) + } else if (isIntervalListExt(trimmedPathName)) { + log.info(s"Loading $pathName as IntervalList and converting to Features.") + loadIntervalList(pathName, + optMinPartitions = optMinPartitions, + stringency = stringency) } else { - log.info(s"Loading $filePath as Parquet containing Features.") - loadParquetFeatures(filePath, predicate = None, projection = projection) + log.info(s"Loading $pathName as Parquet containing Features.") + loadParquetFeatures(pathName, + optPredicate = optPredicate, + optProjection = optProjection) } } /** - * Auto-detects the file type and loads a broadcastable ReferenceFile. + * Load reference sequences into a broadcastable ReferenceFile. * - * If the file type is 2bit, loads a 2bit file. Else, uses loadSequences + * If the path name has a .2bit extension, loads a 2bit file. Else, uses loadContigFragments * to load the reference as an RDD, which is then collected to the driver. * - * @param filePath The path to load. - * @param fragmentLength The length of fragment to use for splitting. - * @return Returns a broadcastable ReferenceFile. + * @see loadContigFragments * - * @see loadSequences + * @param pathName The path name to load reference sequences from. + * Globs/directories for 2bit format are not supported. + * @param maximumFragmentLength Maximum fragment length. Defaults to 10000L. Values greater + * than 1e9 should be avoided. + * @return Returns a broadcastable ReferenceFile. */ - def loadReferenceFile(filePath: String, fragmentLength: Long): ReferenceFile = { - if (filePath.endsWith(".2bit")) { - //TODO(ryan): S3ByteAccess - new TwoBitFile(new LocalFileByteAccess(new File(filePath))) + def loadReferenceFile( + pathName: String, + maximumFragmentLength: Long): ReferenceFile = LoadReferenceFile.time { + + if (is2BitExt(pathName)) { + new TwoBitFile(new LocalFileByteAccess(new File(pathName))) } else { - ReferenceContigMap(loadSequences(filePath, fragmentLength = fragmentLength).rdd) + ReferenceContigMap(loadContigFragments(pathName, maximumFragmentLength = maximumFragmentLength).rdd) } } /** - * Auto-detects the file type and loads contigs as a NucleotideContigFragmentRDD. + * Load nucleotide contig fragments into a NucleotideContigFragmentRDD. * - * Loads files ending in .fa/.fasta/.fa.gz/.fasta.gz as FASTA, else, falls - * back to Parquet. + * If the path name has a .fa/.fasta extension, load as FASTA format. + * Else, fall back to Parquet + Avro. * - * @param filePath The path to load. - * @param projection An optional subset of fields to load. - * @param fragmentLength The length of fragment to use for splitting. - * @return Returns a NucleotideContigFragmentRDD. + * For FASTA format, compressed files are supported through compression codecs configured + * in Hadoop, which by default include .gz and .bz2, but can include more. * * @see loadFasta * @see loadParquetContigFragments - * @see loadReferenceFile + * + * @param pathName The path name to load nucleotide contig fragments from. + * Globs/directories are supported, although file extension must be present + * for FASTA format. + * @param maximumFragmentLength Maximum fragment length. Defaults to 10000L. Values greater + * than 1e9 should be avoided. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @return Returns a NucleotideContigFragmentRDD. */ - def loadSequences( - filePath: String, - projection: Option[Schema] = None, - fragmentLength: Long = 10000): NucleotideContigFragmentRDD = { - if (filePath.endsWith(".fa") || - filePath.endsWith(".fasta") || - filePath.endsWith(".fa.gz") || - filePath.endsWith(".fasta.gz")) { - log.info(s"Loading $filePath as FASTA and converting to NucleotideContigFragment. Projection is ignored.") + def loadContigFragments( + pathName: String, + maximumFragmentLength: Long = 10000L, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): NucleotideContigFragmentRDD = LoadContigFragments.time { + + val trimmedPathName = trimExtensionIfCompressed(pathName) + if (isFastaExt(trimmedPathName)) { + log.info(s"Loading $pathName as FASTA and converting to NucleotideContigFragment.") loadFasta( - filePath, - fragmentLength + pathName, + maximumFragmentLength ) } else { - log.info(s"Loading $filePath as Parquet containing NucleotideContigFragments.") - loadParquetContigFragments(filePath, None, projection) + log.info(s"Loading $pathName as Parquet containing NucleotideContigFragments.") + loadParquetContigFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) } } - private def isVcfExt(filePath: String): Boolean = { - filePath.endsWith(".vcf") || - filePath.endsWith(".vcf.gz") || - filePath.endsWith(".vcf.bgzf") || - filePath.endsWith(".vcf.bgz") - } - /** - * Auto-detects the file type and loads a GenotypeRDD. - * - * If the file has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, loads as VCF. Else, falls back to - * Parquet. + * Load genotypes into a GenotypeRDD. * - * @param filePath The path to load. - * @param projection An optional subset of fields to load. - * @param stringency The validation stringency to use when validating the VCF. - * @return Returns a GenotypeRDD. + * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. + * Else, fall back to Parquet + Avro. * * @see loadVcf * @see loadParquetGenotypes + * + * @param pathName The path name to load genotypes from. + * Globs/directories are supported, although file extension must be present + * for VCF format. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @param stringency The validation stringency to use when validating VCF format. + * Defaults to ValidationStringency.STRICT. + * @return Returns a GenotypeRDD. */ def loadGenotypes( - filePath: String, - projection: Option[Schema] = None, - stringency: ValidationStringency = ValidationStringency.STRICT): GenotypeRDD = { - if (isVcfExt(filePath)) { - log.info(s"Loading $filePath as VCF, and converting to Genotypes. Projection is ignored.") - loadVcf(filePath, stringency).toGenotypeRDD + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): GenotypeRDD = LoadGenotypes.time { + + if (isVcfExt(pathName)) { + log.info(s"Loading $pathName as VCF and converting to Genotypes.") + loadVcf(pathName, stringency).toGenotypeRDD } else { - log.info(s"Loading $filePath as Parquet containing Genotypes. Sequence dictionary for translation is ignored.") - loadParquetGenotypes(filePath, None, projection) + log.info(s"Loading $pathName as Parquet containing Genotypes. Sequence dictionary for translation is ignored.") + loadParquetGenotypes(pathName, optPredicate = optPredicate, optProjection = optProjection) } } /** - * Auto-detects the file type and loads a VariantRDD. - * - * If the file has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, loads as VCF. Else, falls back to - * Parquet. + * Load variants into a VariantRDD. * - * @param filePath The path to load. - * @param projection An optional subset of fields to load. - * @param stringency The validation stringency to use when validating the VCF. - * @return Returns a VariantRDD. + * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. + * Else, fall back to Parquet + Avro. * * @see loadVcf * @see loadParquetVariants + * + * @param pathName The path name to load variants from. + * Globs/directories are supported, although file extension must be present for VCF format. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @param stringency The validation stringency to use when validating VCF format. + * Defaults to ValidationStringency.STRICT. + * @return Returns a VariantRDD. */ def loadVariants( - filePath: String, - projection: Option[Schema] = None, - stringency: ValidationStringency = ValidationStringency.STRICT): VariantRDD = { - if (isVcfExt(filePath)) { - log.info(s"Loading $filePath as VCF, and converting to Variants. Projection is ignored.") - loadVcf(filePath, stringency).toVariantRDD + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): VariantRDD = LoadVariants.time { + + if (isVcfExt(pathName)) { + log.info(s"Loading $pathName as VCF and converting to Variants.") + loadVcf(pathName, stringency).toVariantRDD } else { - log.info(s"Loading $filePath as Parquet containing Variants. Sequence dictionary for translation is ignored.") - loadParquetVariants(filePath, None, projection) + log.info(s"Loading $pathName as Parquet containing Variants. Sequence dictionary for translation is ignored.") + loadParquetVariants(pathName, optPredicate = optPredicate, optProjection = optProjection) } } /** - * Loads alignments from a given path, and infers the input type. - * - * This method can load: - * - * * AlignmentRecords via Parquet (default) - * * SAM/BAM/CRAM (.sam, .bam, .cram) - * * FASTQ (interleaved, single end, paired end) (.ifq, .fq/.fastq) - * * FASTA (.fa, .fasta) - * - * As hinted above, the input type is inferred from the file path extension. - * - * @param filePath Path to load data from. - * @param projection The fields to project; ignored if not Parquet. - * @param filePath2Opt The path to load a second end of FASTQ data from. - * Ignored if not FASTQ. - * @param recordGroupOpt Optional record group name to set if loading FASTQ. - * @param stringency Validation stringency used on FASTQ import/merging. - * @return Returns an AlignmentRecordRDD which wraps the RDD of reads, - * the sequence dictionary representing the contigs these reads are aligned to - * if the reads are aligned, and the record group dictionary for the reads - * if one is available. + * Load alignment records into an AlignmentRecordRDD. + * + * Loads path names ending in: + * * .bam/.cram/.sam as BAM/CRAM/SAM format, + * * .fa/.fasta as FASTA format, + * * .fq/.fastq as FASTQ format, and + * * .ifq as interleaved FASTQ format. + * + * If none of these match, fall back to Parquet + Avro. + * + * For FASTA, FASTQ, and interleaved FASTQ formats, compressed files are supported + * through compression codecs configured in Hadoop, which by default include .gz and .bz2, + * but can include more. + * * @see loadBam - * @see loadParquetAlignments - * @see loadInterleavedFastq * @see loadFastq * @see loadFasta + * @see loadInterleavedFastq + * @see loadParquetAlignments + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported, although file extension must be present + * for BAM/CRAM/SAM, FASTA, and FASTQ formats. + * @param optPathName2 The optional path name to load the second set of alignment + * records from, if loading paired FASTQ format. Globs/directories are supported, + * although file extension must be present. Defaults to None. + * @param optRecordGroup The optional record group name to associate to the alignment + * records. Defaults to None. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @param stringency The validation stringency to use when validating BAM/CRAM/SAM or FASTQ formats. + * Defaults to ValidationStringency.STRICT. + * @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records, + * sequence dictionary representing contigs the alignment records may be aligned to, + * and the record group dictionary for the alignment records if one is available. */ def loadAlignments( - filePath: String, - projection: Option[Schema] = None, - filePath2Opt: Option[String] = None, - recordGroupOpt: Option[String] = None, - stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadAlignmentRecords.time { - - if (filePath.endsWith(".sam") || - filePath.endsWith(".bam") || - filePath.endsWith(".cram")) { - log.info(s"Loading $filePath as SAM/BAM/CRAM and converting to AlignmentRecords. Projection is ignored.") - loadBam(filePath, stringency) - } else if (filePath.endsWith(".ifq")) { - log.info(s"Loading $filePath as interleaved FASTQ and converting to AlignmentRecords. Projection is ignored.") - loadInterleavedFastq(filePath) - } else if (filePath.endsWith(".fq") || - filePath.endsWith(".fastq")) { - log.info(s"Loading $filePath as unpaired FASTQ and converting to AlignmentRecords. Projection is ignored.") - loadFastq(filePath, filePath2Opt, recordGroupOpt, stringency) - } else if (filePath.endsWith(".fa") || - filePath.endsWith(".fasta")) { - log.info(s"Loading $filePath as FASTA and converting to AlignmentRecords. Projection is ignored.") - AlignmentRecordRDD.unaligned(loadFasta(filePath, fragmentLength = 10000).toReads) + pathName: String, + optPathName2: Option[String] = None, + optRecordGroup: Option[String] = None, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None, + stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadAlignments.time { + + val trimmedPathName = trimExtensionIfCompressed(pathName) + if (isBamExt(trimmedPathName)) { + log.info(s"Loading $pathName as BAM/CRAM/SAM and converting to AlignmentRecords.") + loadBam(pathName, stringency) + } else if (isInterleavedFastqExt(trimmedPathName)) { + log.info(s"Loading $pathName as interleaved FASTQ and converting to AlignmentRecords.") + loadInterleavedFastq(pathName) + } else if (isFastqExt(trimmedPathName)) { + log.info(s"Loading $pathName as unpaired FASTQ and converting to AlignmentRecords.") + loadFastq(pathName, optPathName2, optRecordGroup, stringency) + } else if (isFastaExt(trimmedPathName)) { + log.info(s"Loading $pathName as FASTA and converting to AlignmentRecords.") + AlignmentRecordRDD.unaligned(loadFasta(pathName, maximumFragmentLength = 10000L).toReads) } else { - log.info(s"Loading $filePath as Parquet containing AlignmentRecords.") - loadParquetAlignments(filePath, None, projection) + log.info(s"Loading $pathName as Parquet of AlignmentRecords.") + loadParquetAlignments(pathName, optPredicate = optPredicate, optProjection = optProjection) } } /** - * Auto-detects the file type and loads a FragmentRDD. + * Load fragments into a FragmentRDD. + * + * Loads path names ending in: + * * .bam/.cram/.sam as BAM/CRAM/SAM format and + * * .ifq as interleaved FASTQ format. * - * This method can load: + * If none of these match, fall back to Parquet + Avro. * - * * Fragments via Parquet (default) - * * SAM/BAM/CRAM (.sam, .bam, .cram) - * * FASTQ (interleaved only, .ifq) + * For interleaved FASTQ format, compressed files are supported through compression codecs + * configured in Hadoop, which by default include .gz and .bz2, but can include more. * - * @param filePath Path to load data from. - * @return Returns the loaded data as a FragmentRDD. + * @see loadBam + * @see loadAlignments + * @see loadInterleavedFastqAsFragments + * @see loadParquetFragments + * + * @param pathName The path name to load fragments from. + * Globs/directories are supported, although file extension must be present + * for BAM/CRAM/SAM and FASTQ formats. + * @param optPredicate An optional pushdown predicate to use when reading Parquet + Avro. + * Defaults to None. + * @param optProjection An option projection schema to use when reading Parquet + Avro. + * Defaults to None. + * @return Returns a FragmentRDD. */ - def loadFragments(filePath: String): FragmentRDD = LoadFragments.time { - if (filePath.endsWith(".sam") || - filePath.endsWith(".bam") || - filePath.endsWith(".cram")) { + def loadFragments( + pathName: String, + optPredicate: Option[FilterPredicate] = None, + optProjection: Option[Schema] = None): FragmentRDD = LoadFragments.time { + val trimmedPathName = trimExtensionIfCompressed(pathName) + if (isBamExt(trimmedPathName)) { // check to see if the input files are all queryname sorted - if (filesAreQuerynameSorted(filePath)) { - log.info(s"Loading $filePath as queryname sorted SAM/BAM and converting to Fragments.") - loadBam(filePath).transform(RepairPartitions(_)) + if (filesAreQuerynameSorted(pathName)) { + log.info(s"Loading $pathName as queryname sorted BAM/CRAM/SAM and converting to Fragments.") + loadBam(pathName).transform(RepairPartitions(_)) .querynameSortedToFragments } else { - log.info(s"Loading $filePath as SAM/BAM and converting to Fragments.") - loadBam(filePath).toFragments + log.info(s"Loading $pathName as BAM/CRAM/SAM and converting to Fragments.") + loadBam(pathName).toFragments } - } else if (filePath.endsWith(".ifq")) { - log.info(s"Loading $filePath as interleaved FASTQ and converting to Fragments.") - loadInterleavedFastqAsFragments(filePath) + } else if (isInterleavedFastqExt(trimmedPathName)) { + log.info(s"Loading $pathName as interleaved FASTQ and converting to Fragments.") + loadInterleavedFastqAsFragments(pathName) } else { - log.info(s"Loading $filePath as Parquet containing Fragments.") - loadParquetFragments(filePath) + log.info(s"Loading $pathName as Parquet containing Fragments.") + loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) } } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala index 81f5783950..1ee214b465 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala @@ -82,18 +82,18 @@ private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] exten * As such, we must force the user to pass in the schema. * * @tparam U The type of the specific record we are saving. - * @param filename Path to save records to. + * @param pathName Path to save records to. * @param sc SparkContext used for identifying underlying file system. * @param schema Schema of records we are saving. * @param avro Seq of records we are saving. */ - protected def saveAvro[U <: SpecificRecordBase](filename: String, + protected def saveAvro[U <: SpecificRecordBase](pathName: String, sc: SparkContext, schema: Schema, avro: Seq[U])(implicit tUag: ClassTag[U]) { // get our current file system - val path = new Path(filename) + val path = new Path(pathName) val fs = path.getFileSystem(sc.hadoopConfiguration) // get an output stream @@ -126,21 +126,21 @@ private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] exten /** * Saves an RDD of Avro data to Parquet. * - * @param filePath The path to save the file to. - * @param blockSize The size in bytes of blocks to write. - * @param pageSize The size in bytes of pages to write. - * @param compressCodec The compression codec to apply to pages. + * @param pathName The path to save the file to. + * @param blockSize The size in bytes of blocks to write. Defaults to 128 * 1024 * 1024. + * @param pageSize The size in bytes of pages to write. Defaults to 1 * 1024 * 1024. + * @param compressCodec The compression codec to apply to pages. Defaults to CompressionCodecName.GZIP. * @param disableDictionaryEncoding If false, dictionary encoding is used. If - * true, delta encoding is used. - * @param schema The schema to set. + * true, delta encoding is used. Defaults to false. + * @param optSchema The optional schema to set. Defaults to None. */ protected def saveRddAsParquet( - filePath: String, + pathName: String, blockSize: Int = 128 * 1024 * 1024, pageSize: Int = 1 * 1024 * 1024, compressCodec: CompressionCodecName = CompressionCodecName.GZIP, disableDictionaryEncoding: Boolean = false, - schema: Option[Schema] = None): Unit = SaveAsADAM.time { + optSchema: Option[Schema] = None): Unit = SaveAsADAM.time { log.info("Saving data in ADAM format") val job = HadoopUtil.newJob(rdd.context) @@ -150,14 +150,14 @@ private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] exten ParquetOutputFormat.setPageSize(job, pageSize) AvroParquetOutputFormat.setSchema( job, - schema.getOrElse(manifest[T].runtimeClass.asInstanceOf[Class[T]].newInstance().getSchema) + optSchema.getOrElse(manifest[T].runtimeClass.asInstanceOf[Class[T]].newInstance().getSchema) ) // Add the Void Key val recordToSave = rdd.map(p => (null, p)) // Save the values to the ADAM/Parquet file recordToSave.saveAsNewAPIHadoopFile( - filePath, + pathName, classOf[java.lang.Void], manifest[T].runtimeClass.asInstanceOf[Class[T]], classOf[InstrumentedADAMAvroParquetOutputFormat], ContextUtil.getConfiguration(job) ) @@ -180,7 +180,7 @@ private[rdd] class ConcreteADAMRDDFunctions[T <% IndexedRecord: Manifest](val rd /** * Saves an RDD of Avro data to Parquet. * - * @param filePath The path to save the file to. + * @param pathName The path to save the file to. * @param blockSize The size in bytes of blocks to write. * @param pageSize The size in bytes of pages to write. * @param compressCodec The compression codec to apply to pages. @@ -189,13 +189,13 @@ private[rdd] class ConcreteADAMRDDFunctions[T <% IndexedRecord: Manifest](val rd * @param schema The schema to set. */ def saveAsParquet( - filePath: String, + pathName: String, blockSize: Int = 128 * 1024 * 1024, pageSize: Int = 1 * 1024 * 1024, compressCodec: CompressionCodecName = CompressionCodecName.GZIP, disableDictionaryEncoding: Boolean = false, schema: Option[Schema] = None): Unit = { - saveRddAsParquet(filePath, blockSize, pageSize, compressCodec, disableDictionaryEncoding, schema) + saveRddAsParquet(pathName, blockSize, pageSize, compressCodec, disableDictionaryEncoding, schema) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/util/FileExtensions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/util/FileExtensions.scala new file mode 100644 index 0000000000..2a443e6527 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/util/FileExtensions.scala @@ -0,0 +1,121 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.util + +/** + * Match path names to file extensions. + */ +object FileExtensions { + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a 2bit format file extension. + */ + private[adam] def is2BitExt(pathName: String): Boolean = { + pathName.endsWith(".2bit") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a BAM/CRAM/SAM format file extension. + */ + private[adam] def isBamExt(pathName: String): Boolean = { + pathName.endsWith(".bam") || + pathName.endsWith(".cram") || + pathName.endsWith(".sam") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a BED6/12 format file extension. + */ + private[adam] def isBedExt(pathName: String): Boolean = { + pathName.endsWith(".bed") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a FASTA format file extension. + */ + private[adam] def isFastaExt(pathName: String): Boolean = { + pathName.endsWith(".fa") || + pathName.endsWith(".fasta") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a FASTQ format file extension. + */ + private[adam] def isFastqExt(pathName: String): Boolean = { + pathName.endsWith(".fq") || + pathName.endsWith(".fastq") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a GFF3 format file extension. + */ + private[adam] def isGff3Ext(pathName: String): Boolean = { + pathName.endsWith(".gff3") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a GFF2/GTF format file extension. + */ + private[adam] def isGtfExt(pathName: String): Boolean = { + pathName.endsWith(".gff") || + pathName.endsWith(".gtf") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches an interleaved FASTQ format file extension. + */ + private[adam] def isInterleavedFastqExt(pathName: String): Boolean = { + pathName.endsWith(".ifq") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches an IntervalList format file extension. + */ + private[adam] def isIntervalListExt(pathName: String): Boolean = { + pathName.endsWith(".interval_list") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a NarrowPeak format file extension. + */ + private[adam] def isNarrowPeakExt(pathName: String): Boolean = { + pathName.endsWith(".narrowpeak") || + pathName.endsWith(".narrowPeak") + } + + /** + * @param pathName The path name to match. + * @return Returns true if the path name matches a VCF format file extension. + */ + private[adam] def isVcfExt(pathName: String): Boolean = { + pathName.endsWith(".vcf") || + pathName.endsWith(".vcf.gz") || + pathName.endsWith(".vcf.bgzf") || + pathName.endsWith(".vcf.bgz") + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/converters/FastaConverterSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/converters/FastaConverterSuite.scala index b83bb59471..7f13e25fd9 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/converters/FastaConverterSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/converters/FastaConverterSuite.scala @@ -188,7 +188,7 @@ class FastaConverterSuite extends ADAMFunSuite { val fasta = fasta1 ::: fasta2 val rdd = sc.parallelize(fasta.toSeq) - val adamFasta = FastaConverter(rdd, maxFragmentLength = 35) + val adamFasta = FastaConverter(rdd, maximumFragmentLength = 35) assert(adamFasta.count === 64) val fastaElement1 = adamFasta.filter(_.getContig.getContigName == "chr1").collect() @@ -209,7 +209,7 @@ class FastaConverterSuite extends ADAMFunSuite { sparkTest("convert reference fasta file") { //Loading "human_g1k_v37_chr1_59kb.fasta" - val referenceSequences = sc.loadSequences(chr1File, fragmentLength = 10).rdd.collect() + val referenceSequences = sc.loadContigFragments(chr1File, maximumFragmentLength = 10).rdd.collect() assert(referenceSequences.forall(_.getContig.getContigName.toString == "1")) assert(referenceSequences.slice(0, referenceSequences.length - 2).forall(_.getFragmentSequence.length == 10)) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/AlignmentRecordFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/AlignmentRecordFieldSuite.scala index 58ea4a281c..7f3b86197b 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/AlignmentRecordFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/AlignmentRecordFieldSuite.scala @@ -70,7 +70,7 @@ class AlignmentRecordFieldSuite extends ADAMFunSuite { inferredInsertSize ) - val alignmentRecords: RDD[AlignmentRecord] = sc.loadParquet(path, projection = Some(projection)) + val alignmentRecords: RDD[AlignmentRecord] = sc.loadParquet(path, optProjection = Some(projection)) assert(alignmentRecords.count() === 1) assert(alignmentRecords.first.getContigName === "6") assert(alignmentRecords.first.getStart === 29941260L) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/ContigFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/ContigFieldSuite.scala index 0c8e6e25d4..9c29845b3d 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/ContigFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/ContigFieldSuite.scala @@ -49,7 +49,7 @@ class ContigFieldSuite extends ADAMFunSuite { referenceIndex ) - val contigs: RDD[Contig] = sc.loadParquet(path, projection = Some(projection)) + val contigs: RDD[Contig] = sc.loadParquet(path, optProjection = Some(projection)) assert(contigs.count() === 1) assert(contigs.first.getContigName === "6") assert(contigs.first.getContigLength === 170805979) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/DbxrefFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/DbxrefFieldSuite.scala index 3256f27b3f..fb5728fa22 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/DbxrefFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/DbxrefFieldSuite.scala @@ -39,7 +39,7 @@ class DbxrefFieldSuite extends ADAMFunSuite { accession ) - val dbxrefs: RDD[Dbxref] = sc.loadParquet(path, projection = Some(projection)) + val dbxrefs: RDD[Dbxref] = sc.loadParquet(path, optProjection = Some(projection)) assert(dbxrefs.count() === 1) assert(dbxrefs.first.getDb === "EMBL") assert(dbxrefs.first.getAccession === "AA816246") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FeatureFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FeatureFieldSuite.scala index 7c33f19f74..bfe307a439 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FeatureFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FeatureFieldSuite.scala @@ -94,7 +94,7 @@ class FeatureFieldSuite extends ADAMFunSuite { attributes ) - val features: RDD[Feature] = sc.loadParquet(path, projection = Some(projection)) + val features: RDD[Feature] = sc.loadParquet(path, optProjection = Some(projection)) assert(features.count() === 1) assert(features.first.getFeatureId === "ENSG00000206503") assert(features.first.getName === "HLA-A") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FieldEnumerationSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FieldEnumerationSuite.scala index 85e01144a3..a2d72df46e 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FieldEnumerationSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FieldEnumerationSuite.scala @@ -42,7 +42,7 @@ class FieldEnumerationSuite extends ADAMFunSuite { rRdd.saveAsParquet(TestSaveArgs(readsParquetFilepath)) val p1 = Projection(AlignmentRecordField.readName) - val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p1)).rdd + val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, optProjection = Some(p1)).rdd assert(reads1.count() === 200) val first1 = reads1.first() @@ -50,7 +50,7 @@ class FieldEnumerationSuite extends ADAMFunSuite { assert(first1.getReadMapped === false) val p2 = Projection(AlignmentRecordField.readName, AlignmentRecordField.readMapped) - val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p2)).rdd + val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, optProjection = Some(p2)).rdd assert(reads2.count() === 200) val first2 = reads2.first() diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FragmentFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FragmentFieldSuite.scala index 2d074dd717..9ff2308443 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/FragmentFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/FragmentFieldSuite.scala @@ -49,7 +49,7 @@ class FragmentFieldSuite extends ADAMFunSuite { alignments ) - val fragments: RDD[Fragment] = sc.loadParquet(path, projection = Some(projection)) + val fragments: RDD[Fragment] = sc.loadParquet(path, optProjection = Some(projection)) assert(fragments.count() === 1) assert(fragments.first.getReadName === "read_name") assert(fragments.first.getInstrument === "instrument") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/GenotypeFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/GenotypeFieldSuite.scala index 676d575444..3de97b2973 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/GenotypeFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/GenotypeFieldSuite.scala @@ -59,7 +59,7 @@ class GenotypeFieldSuite extends ADAMFunSuite { phaseQuality ) - val genotypes: RDD[Genotype] = sc.loadParquet(path, projection = Some(projection)) + val genotypes: RDD[Genotype] = sc.loadParquet(path, optProjection = Some(projection)) assert(genotypes.count() === 1) assert(genotypes.first.getContigName === "6") assert(genotypes.first.getStart === 29941260L) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/NucleotideContigFragmentFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/NucleotideContigFragmentFieldSuite.scala index 4011433bea..3e736dfc27 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/NucleotideContigFragmentFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/NucleotideContigFragmentFieldSuite.scala @@ -53,7 +53,7 @@ class NucleotideContigFragmentFieldSuite extends ADAMFunSuite { numberOfFragmentsInContig ) - val nucleotideContigFragments: RDD[NucleotideContigFragment] = sc.loadParquet(path, projection = Some(projection)) + val nucleotideContigFragments: RDD[NucleotideContigFragment] = sc.loadParquet(path, optProjection = Some(projection)) assert(nucleotideContigFragments.count() === 1) assert(nucleotideContigFragments.first.getContig.getContigName === "6") assert(nucleotideContigFragments.first.getDescription === "Chromosome 6") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/OntologyTermFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/OntologyTermFieldSuite.scala index 109dbbc543..2f77093736 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/OntologyTermFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/OntologyTermFieldSuite.scala @@ -39,7 +39,7 @@ class OntologyTermFieldSuite extends ADAMFunSuite { accession ) - val ontologyTerms: RDD[OntologyTerm] = sc.loadParquet(path, projection = Some(projection)) + val ontologyTerms: RDD[OntologyTerm] = sc.loadParquet(path, optProjection = Some(projection)) assert(ontologyTerms.count() === 1) assert(ontologyTerms.first.getDb === "GO") assert(ontologyTerms.first.getAccession === "0046703") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/ReadFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/ReadFieldSuite.scala index af32399cc4..8aca74fad8 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/ReadFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/ReadFieldSuite.scala @@ -53,7 +53,7 @@ class ReadFieldSuite extends ADAMFunSuite { qualityScoreVariant ) - val reads: RDD[Read] = sc.loadParquet(path, projection = Some(projection)) + val reads: RDD[Read] = sc.loadParquet(path, optProjection = Some(projection)) assert(reads.count() === 1) assert(reads.first.getName === "read 1") assert(reads.first.getDescription === "read 1") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/RecordGroupMetadataFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/RecordGroupMetadataFieldSuite.scala index 2515102101..cb3270c4c9 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/RecordGroupMetadataFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/RecordGroupMetadataFieldSuite.scala @@ -56,7 +56,7 @@ class RecordGroupMetadataFieldSuite extends ADAMFunSuite { platformUnit ) - val recordGroupMetadata: RDD[RecordGroupMetadata] = sc.loadParquet(path, projection = Some(projection)) + val recordGroupMetadata: RDD[RecordGroupMetadata] = sc.loadParquet(path, optProjection = Some(projection)) assert(recordGroupMetadata.count() === 1) assert(recordGroupMetadata.first.getName === "name") assert(recordGroupMetadata.first.getSequencingCenter === "sequencing_center") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SampleFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SampleFieldSuite.scala index c361f9168c..addccaf4e4 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SampleFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SampleFieldSuite.scala @@ -40,7 +40,7 @@ class SampleFieldSuite extends ADAMFunSuite { attributes ) - val samples: RDD[Sample] = sc.loadParquet(path, projection = Some(projection)) + val samples: RDD[Sample] = sc.loadParquet(path, optProjection = Some(projection)) assert(samples.count() === 1) assert(samples.first.getSampleId === "sample_id") assert(samples.first.getName === "name") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SequenceFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SequenceFieldSuite.scala index 85c058b2fe..a5bf5eb8e9 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SequenceFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SequenceFieldSuite.scala @@ -45,7 +45,7 @@ class SequenceFieldSuite extends ADAMFunSuite { length ) - val sequences: RDD[Sequence] = sc.loadParquet(path, projection = Some(projection)); + val sequences: RDD[Sequence] = sc.loadParquet(path, optProjection = Some(projection)); assert(sequences.count() === 1) assert(sequences.first.getName === "6") assert(sequences.first.getDescription === "Chromosome 6") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SliceFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SliceFieldSuite.scala index 9c7951f277..47cbf160a7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/SliceFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/SliceFieldSuite.scala @@ -55,7 +55,7 @@ class SliceFieldSuite extends ADAMFunSuite { length ) - val slices: RDD[Slice] = sc.loadParquet(path, projection = Some(projection)); + val slices: RDD[Slice] = sc.loadParquet(path, optProjection = Some(projection)); assert(slices.count() === 1) assert(slices.first.getName === "6") assert(slices.first.getDescription === "Chromosome 6") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/TranscriptEffectFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/TranscriptEffectFieldSuite.scala index 58c7265152..db4d23d585 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/TranscriptEffectFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/TranscriptEffectFieldSuite.scala @@ -76,7 +76,7 @@ class TranscriptEffectFieldSuite extends ADAMFunSuite { messages ) - val transcriptEffects: RDD[TranscriptEffect] = sc.loadParquet(path, projection = Some(projection)) + val transcriptEffects: RDD[TranscriptEffect] = sc.loadParquet(path, optProjection = Some(projection)) assert(transcriptEffects.count() === 1) assert(transcriptEffects.first.getAlternateAllele === "A") assert(transcriptEffects.first.getEffects.get(0) === "SO:0002012") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantAnnotationFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantAnnotationFieldSuite.scala index 162a937fb8..6028162499 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantAnnotationFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantAnnotationFieldSuite.scala @@ -75,7 +75,7 @@ class VariantAnnotationFieldSuite extends ADAMFunSuite { attributes ) - val variantAnnotations: RDD[VariantAnnotation] = sc.loadParquet(path, projection = Some(projection)) + val variantAnnotations: RDD[VariantAnnotation] = sc.loadParquet(path, optProjection = Some(projection)) assert(variantAnnotations.count() === 1) assert(variantAnnotations.first.getAncestralAllele === "T") assert(variantAnnotations.first.getAlleleCount === 42) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantCallingAnnotationsFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantCallingAnnotationsFieldSuite.scala index 7781fbda81..9490ffa0eb 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantCallingAnnotationsFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantCallingAnnotationsFieldSuite.scala @@ -49,7 +49,7 @@ class VariantCallingAnnotationsFieldSuite extends ADAMFunSuite { attributes ) - val variantCallingAnnotations: RDD[VariantCallingAnnotations] = sc.loadParquet(path, projection = Some(projection)) + val variantCallingAnnotations: RDD[VariantCallingAnnotations] = sc.loadParquet(path, optProjection = Some(projection)) assert(variantCallingAnnotations.count() === 1) assert(variantCallingAnnotations.first.getDownsampled === true) assert(variantCallingAnnotations.first.getMapq0Reads === 42) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantFieldSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantFieldSuite.scala index 208cb0e8d6..20f67e27fd 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantFieldSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/projections/VariantFieldSuite.scala @@ -54,7 +54,7 @@ class VariantFieldSuite extends ADAMFunSuite { filtersFailed ) - val variants: RDD[Variant] = sc.loadParquet(path, projection = Some(projection)) + val variants: RDD[Variant] = sc.loadParquet(path, optProjection = Some(projection)) assert(variants.count() === 1) assert(variants.first.getContigName === "6") assert(variants.first.getStart === 29941260L) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index 5e538bc159..058e7bb2e4 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -288,7 +288,7 @@ class ADAMContextSuite extends ADAMFunSuite { val pred: FilterPredicate = (LongColumn("start") === 16097631L) // the following only reads one row group - val adamVariants = sc.loadParquetVariants(loc, predicate = Some(pred)) + val adamVariants = sc.loadParquetVariants(loc, optPredicate = Some(pred)) assert(adamVariants.rdd.count === 1) } @@ -506,7 +506,7 @@ class ADAMContextSuite extends ADAMFunSuite { val readsFilepath2 = testFile("bqsr1-r2.fq") val fastqReads1: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath1).rdd val fastqReads2: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath2).rdd - val pairedReads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath1, filePath2Opt = Option(readsFilepath2)).rdd + val pairedReads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath1, optPathName2 = Option(readsFilepath2)).rdd assert(fastqReads1.rdd.count === 488) assert(fastqReads2.rdd.count === 488) assert(pairedReads.rdd.count === 976) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/GenomicPositionPartitionerSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/GenomicPositionPartitionerSuite.scala index 7458943827..3d44236aea 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/GenomicPositionPartitionerSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/GenomicPositionPartitionerSuite.scala @@ -112,7 +112,7 @@ class GenomicPositionPartitionerSuite extends ADAMFunSuite { import org.bdgenomics.adam.projections.AlignmentRecordField._ Projection(contigName, start, readName, readMapped) } - val gRdd = sc.loadAlignments(filename, projection = Some(p)) + val gRdd = sc.loadAlignments(filename, optProjection = Some(p)) val rdd = gRdd.rdd val parter = GenomicPositionPartitioner(parts, gRdd.sequences) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/FlagStatSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/FlagStatSuite.scala index 610ff260dc..5d5b4fb9ca 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/FlagStatSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/FlagStatSuite.scala @@ -43,7 +43,7 @@ class FlagStatSuite extends ADAMFunSuite { AlignmentRecordField.mapq, AlignmentRecordField.failedVendorQualityChecks) - val adamFile: RDD[AlignmentRecord] = sc.loadAlignments(inputPath, projection = Some(projection)).rdd + val adamFile: RDD[AlignmentRecord] = sc.loadAlignments(inputPath, optProjection = Some(projection)).rdd val (failedVendorQuality, passedVendorQuality) = FlagStat(adamFile)