-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ADAMContext APIs to create genomic RDDs from dataframes #2000
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,10 @@ | |
package org.bdgenomics.adam.rdd | ||
|
||
import java.io.{ File, FileNotFoundException, InputStream } | ||
|
||
import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } | ||
import htsjdk.samtools.util.Locatable | ||
import htsjdk.variant.vcf.{ | ||
VCFHeader, | ||
VCFCompoundHeaderLine, | ||
VCFFormatHeaderLine, | ||
VCFHeaderLine, | ||
VCFInfoHeaderLine | ||
} | ||
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code style: we format this style of import to separate lines if there are more than three items or if the line gets too long. Same goes for the other import formatting changes. |
||
import org.apache.avro.Schema | ||
import org.apache.avro.file.DataFileStream | ||
import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord } | ||
|
@@ -42,66 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil | |
import org.apache.spark.SparkContext | ||
import org.apache.spark.rdd.MetricsContext._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext } | ||
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext, SparkSession } | ||
import org.bdgenomics.adam.converters._ | ||
import org.bdgenomics.adam.instrumentation.Timers._ | ||
import org.bdgenomics.adam.io._ | ||
import org.bdgenomics.adam.models._ | ||
import org.bdgenomics.adam.projections.{ | ||
FeatureField, | ||
Projection | ||
} | ||
import org.bdgenomics.adam.rdd.contig.{ | ||
DatasetBoundNucleotideContigFragmentRDD, | ||
NucleotideContigFragmentRDD, | ||
ParquetUnboundNucleotideContigFragmentRDD, | ||
RDDBoundNucleotideContigFragmentRDD | ||
} | ||
import org.bdgenomics.adam.projections.{ FeatureField, Projection } | ||
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD } | ||
import org.bdgenomics.adam.rdd.feature._ | ||
import org.bdgenomics.adam.rdd.fragment.{ | ||
DatasetBoundFragmentRDD, | ||
FragmentRDD, | ||
ParquetUnboundFragmentRDD, | ||
RDDBoundFragmentRDD | ||
} | ||
import org.bdgenomics.adam.rdd.read.{ | ||
AlignmentRecordRDD, | ||
DatasetBoundAlignmentRecordRDD, | ||
RepairPartitions, | ||
ParquetUnboundAlignmentRecordRDD, | ||
RDDBoundAlignmentRecordRDD | ||
} | ||
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD } | ||
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions } | ||
import org.bdgenomics.adam.rdd.variant._ | ||
import org.bdgenomics.adam.rich.RichAlignmentRecord | ||
import org.bdgenomics.adam.sql.{ | ||
AlignmentRecord => AlignmentRecordProduct, | ||
Feature => FeatureProduct, | ||
Fragment => FragmentProduct, | ||
Genotype => GenotypeProduct, | ||
NucleotideContigFragment => NucleotideContigFragmentProduct, | ||
Variant => VariantProduct, | ||
VariantContext => VariantContextProduct | ||
} | ||
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct } | ||
import org.bdgenomics.adam.util.FileExtensions._ | ||
import org.bdgenomics.adam.util.{ | ||
GenomeFileReader, | ||
ReferenceContigMap, | ||
ReferenceFile, | ||
SequenceDictionaryReader, | ||
TwoBitFile | ||
} | ||
import org.bdgenomics.formats.avro.{ | ||
AlignmentRecord, | ||
Contig, | ||
Feature, | ||
Fragment, | ||
Genotype, | ||
NucleotideContigFragment, | ||
ProcessingStep, | ||
RecordGroup => RecordGroupMetadata, | ||
Sample, | ||
Variant | ||
} | ||
import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile } | ||
import org.bdgenomics.formats.avro.{ RecordGroup => RecordGroupMetadata, _ } | ||
import org.bdgenomics.utils.instrumentation.Metrics | ||
import org.bdgenomics.utils.io.LocalFileByteAccess | ||
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging } | ||
|
@@ -1121,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter | |
* @param sc The SparkContext to wrap. | ||
*/ | ||
class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { | ||
@transient val spark = SQLContext.getOrCreate(sc).sparkSession | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't appear to be used anywhere, except for the import below. Why bring that up here instead of keeping it as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I thought it was a reasonable thing to have available, and it saved me from importing the implicitits in each of the new functions I added. I'm fine with moving it if you prefer, though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it eliminates the need to repeatedly import the implicits, then I'd favor keeping it. |
||
import spark.implicits._ | ||
|
||
/** | ||
* @param samHeader The header to extract a sequence dictionary from. | ||
|
@@ -1885,15 +1838,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
optPredicate: Option[FilterPredicate] = None, | ||
optProjection: Option[Schema] = None): AlignmentRecordRDD = { | ||
|
||
// convert avro to sequence dictionary | ||
val sd = loadAvroSequenceDictionary(pathName) | ||
|
||
// convert avro to sequence dictionary | ||
val rgd = loadAvroRecordGroupDictionary(pathName) | ||
|
||
// load processing step descriptions | ||
val pgs = loadAvroPrograms(pathName) | ||
|
||
(optPredicate, optProjection) match { | ||
case (None, None) => { | ||
ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) | ||
|
@@ -2371,6 +2318,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a public API, I think it's more convenient to pass a dataframe since the initial transformation is likely to be expressed in SQL or the dataframe API. Since users may call this method interactively from the shell, I think it's important to minimize friction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think exposing this as a DataFrame is fine. We're not super opinionated between DataFrames and Datasets (wherever we expose a Dataset, we expose the underlying DataFrame as well), and in the Python/R APIs, we only expose DataFrames. |
||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val samples = loadAvroSamples(metadataPath) | ||
val headerLines = loadHeaderLines(metadataPath) | ||
DatasetBoundVariantContextRDD(df.as[VariantContextProduct], sd, samples, headerLines) | ||
} | ||
|
||
/** | ||
* Load a path name in Parquet + Avro format into a VariantContextRDD. | ||
* | ||
|
@@ -2380,21 +2334,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
*/ | ||
def loadParquetVariantContexts( | ||
pathName: String): VariantContextRDD = { | ||
|
||
// load header lines | ||
val headers = loadHeaderLines(pathName) | ||
|
||
// load sequence info | ||
val sd = loadAvroSequenceDictionary(pathName) | ||
|
||
// load avro record group dictionary and convert to samples | ||
val samples = loadAvroSamples(pathName) | ||
|
||
val sqlContext = SQLContext.getOrCreate(sc) | ||
import sqlContext.implicits._ | ||
val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct] | ||
|
||
new DatasetBoundVariantContextRDD(ds, sd, samples, headers) | ||
val df = sqlContext.read.parquet(pathName) | ||
loadVariantContexts(df, pathName) | ||
} | ||
|
||
/** | ||
|
@@ -2986,6 +2928,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadFeatures(df: DataFrame, metadataPath: String): FeatureRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
DatasetBoundFeatureRDD(df.as[FeatureProduct], sd) | ||
} | ||
|
||
/** | ||
* Load reference sequences into a broadcastable ReferenceFile. | ||
* | ||
|
@@ -3085,6 +3032,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadContigFragments(df: DataFrame, metadataPath: String): NucleotideContigFragmentRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
DatasetBoundNucleotideContigFragmentRDD(df.as[NucleotideContigFragmentProduct], sd) | ||
} | ||
|
||
/** | ||
* Load genotypes into a GenotypeRDD. | ||
* | ||
|
@@ -3120,6 +3072,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadGenotypes(df: DataFrame, metadataPath: String): GenotypeRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val samples = loadAvroSamples(metadataPath) | ||
val headerLines = loadHeaderLines(metadataPath) | ||
DatasetBoundGenotypeRDD(df.as[GenotypeProduct], sd, samples, headerLines) | ||
} | ||
|
||
/** | ||
* Load variants into a VariantRDD. | ||
* | ||
|
@@ -3154,6 +3113,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadVariants(df: DataFrame, metadataPath: String): VariantRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val headerLines = loadHeaderLines(metadataPath) | ||
DatasetBoundVariantRDD(df.as[VariantProduct], sd, headerLines) | ||
} | ||
|
||
/** | ||
* Load alignment records into an AlignmentRecordRDD. | ||
* | ||
|
@@ -3224,6 +3189,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val rgd = loadAvroRecordGroupDictionary(metadataPath) | ||
val process = loadAvroPrograms(metadataPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. process → processingSteps |
||
new DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sd, rgd, process) | ||
} | ||
|
||
/** | ||
* Load fragments into a FragmentRDD. | ||
* | ||
|
@@ -3282,6 +3254,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |
} | ||
} | ||
|
||
def loadFragments(df: DataFrame, metadataPath: String): FragmentRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val rgd = loadAvroRecordGroupDictionary(metadataPath) | ||
val processingSteps = loadAvroPrograms(metadataPath) | ||
DatasetBoundFragmentRDD(df.as[FragmentProduct], sd, rgd, processingSteps) | ||
} | ||
|
||
/** | ||
* Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,21 @@ | |
*/ | ||
package org.bdgenomics.adam.rdd | ||
|
||
import htsjdk.samtools.{ | ||
SAMFormatException, | ||
SAMProgramRecord, | ||
ValidationStringency | ||
} | ||
import htsjdk.samtools.{ SAMFormatException, SAMProgramRecord, ValidationStringency } | ||
import java.io.{ File, FileNotFoundException } | ||
|
||
import com.google.common.io.Files | ||
import org.apache.hadoop.fs.Path | ||
import org.apache.parquet.filter2.dsl.Dsl._ | ||
import org.apache.parquet.filter2.predicate.FilterPredicate | ||
import org.apache.parquet.hadoop.metadata.CompressionCodecName | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.SQLContext | ||
import org.bdgenomics.adam.models._ | ||
import org.bdgenomics.adam.rdd.ADAMContext._ | ||
import org.bdgenomics.adam.util.PhredUtils._ | ||
import org.bdgenomics.adam.util.ADAMFunSuite | ||
import org.bdgenomics.adam.sql.{ VariantContext => VCProduct } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code style: don't add unnecessary abbreviations, especially in class names There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
import org.bdgenomics.formats.avro._ | ||
import org.seqdoop.hadoop_bam.CRAMInputFormat | ||
import org.seqdoop.hadoop_bam.util.SAMHeaderReader | ||
|
@@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite { | |
assert(secondPg.head.getCommandLine === "\"myProg 456\"") | ||
assert(secondPg.head.getVersion === "1.0.0") | ||
} | ||
|
||
sparkTest("load variant contexts from dataframe") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fnothaft This test (and it's equivalent form where we reload using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll take a look into the failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why this is failing; I took the test output and pasted the dump of all of the records in each array and they have the same textual values. You could either be getting bitten by some odd floating point comparison bug, or perhaps the Array== comparator does something wonky. |
||
val path = testFile("small.vcf") | ||
val vcs = sc.loadVcf(path) | ||
val outputDir = tmpLocation() | ||
vcs.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadVariantContexts(df, outputDir) | ||
assert(reloaded.sequences == vcs.sequences) | ||
assert(reloaded.headerLines.toSet == vcs.headerLines.toSet) | ||
assert(reloaded.samples == vcs.samples) | ||
assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep == | ||
vcs.rdd.collect().map(VCProduct.fromModel).deep) | ||
} | ||
|
||
sparkTest("load features from dataframe") { | ||
val path = testFile("dvl1.200.bed") | ||
val features = sc.loadFeatures(path) | ||
val outputDir = tmpLocation() | ||
features.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadFeatures(df, outputDir) | ||
assert(reloaded.sequences == features.sequences) | ||
assert(reloaded.rdd.collect().deep == features.rdd.collect().deep) | ||
} | ||
|
||
sparkTest("load contig fragments from dataframe") { | ||
val inputPath = testFile("chr20.250k.fa.gz") | ||
val contigFragments = sc.loadFasta(inputPath, 10000L) | ||
val outputDir = tmpLocation() | ||
contigFragments.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadContigFragments(df, outputDir) | ||
assert(reloaded.sequences == contigFragments.sequences) | ||
assert(reloaded.rdd.collect().deep == contigFragments.rdd.collect().deep) | ||
} | ||
|
||
sparkTest("load genotypes from dataframe") { | ||
val path = testFile("small.vcf") | ||
val gts = sc.loadGenotypes(path) | ||
val outputDir = tmpLocation() | ||
gts.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadGenotypes(df, outputDir) | ||
assert(reloaded.sequences == gts.sequences) | ||
assert(reloaded.headerLines.toSet == gts.headerLines.toSet) | ||
assert(reloaded.samples == gts.samples) | ||
assert(reloaded.rdd.collect().deep == gts.rdd.collect().deep) | ||
} | ||
|
||
sparkTest("load variants from dataframe") { | ||
val path = testFile("gvcf_dir/gvcf_multiallelic.g.vcf") | ||
val variants = sc.loadVcf(path).toVariants() | ||
val outputDir = tmpLocation() | ||
variants.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadVariants(df, outputDir) | ||
assert(reloaded.sequences == variants.sequences) | ||
assert(reloaded.headerLines.toSet == variants.headerLines.toSet) | ||
assert(reloaded.rdd.collect().deep == variants.rdd.collect().deep) | ||
} | ||
|
||
sparkTest("load alignments from dataframe") { | ||
val path = testFile("bqsr1-r1.fq") | ||
val alignments = sc.loadAlignments(path) | ||
val outputDir = tmpLocation() | ||
alignments.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadAlignments(df, outputDir) | ||
assert(reloaded.sequences == alignments.sequences) | ||
assert(reloaded.recordGroups == alignments.recordGroups) | ||
assert(reloaded.processingSteps == alignments.processingSteps) | ||
assert(reloaded.rdd.collect().deep == alignments.rdd.collect().deep) | ||
} | ||
|
||
sparkTest("load fragments from dataframe") { | ||
val path = testFile("sample1.query.sam") | ||
val fragments = sc.loadFragments(path) | ||
val outputDir = tmpLocation() | ||
fragments.saveAsParquet(outputDir) | ||
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) | ||
val reloaded = sc.loadFragments(df, outputDir) | ||
assert(reloaded.sequences == fragments.sequences) | ||
assert(reloaded.recordGroups == fragments.recordGroups) | ||
assert(reloaded.processingSteps == fragments.processingSteps) | ||
assert(reloaded.rdd.collect().deep == fragments.rdd.collect().deep) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no space after import.