Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ADAMContext APIs to create genomic RDDs from dataframes #2000

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 56 additions & 79 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space after import.

import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
VCFCompoundHeaderLine,
VCFFormatHeaderLine,
VCFHeaderLine,
VCFInfoHeaderLine
}
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style: we format this style of import to separate lines if there are more than three items or if the line gets too long. Same goes for the other import formatting changes.

import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord }
Expand All @@ -42,66 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext }
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext, SparkSession }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{
FeatureField,
Projection
}
import org.bdgenomics.adam.rdd.contig.{
DatasetBoundNucleotideContigFragmentRDD,
NucleotideContigFragmentRDD,
ParquetUnboundNucleotideContigFragmentRDD,
RDDBoundNucleotideContigFragmentRDD
}
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD }
import org.bdgenomics.adam.rdd.feature._
import org.bdgenomics.adam.rdd.fragment.{
DatasetBoundFragmentRDD,
FragmentRDD,
ParquetUnboundFragmentRDD,
RDDBoundFragmentRDD
}
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
DatasetBoundAlignmentRecordRDD,
RepairPartitions,
ParquetUnboundAlignmentRecordRDD,
RDDBoundAlignmentRecordRDD
}
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD }
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions }
import org.bdgenomics.adam.rdd.variant._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.sql.{
AlignmentRecord => AlignmentRecordProduct,
Feature => FeatureProduct,
Fragment => FragmentProduct,
Genotype => GenotypeProduct,
NucleotideContigFragment => NucleotideContigFragmentProduct,
Variant => VariantProduct,
VariantContext => VariantContextProduct
}
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct }
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.adam.util.{
GenomeFileReader,
ReferenceContigMap,
ReferenceFile,
SequenceDictionaryReader,
TwoBitFile
}
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Contig,
Feature,
Fragment,
Genotype,
NucleotideContigFragment,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample,
Variant
}
import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile }
import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata }
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
Expand Down Expand Up @@ -1885,15 +1836,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None): AlignmentRecordRDD = {

// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)

// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

// load processing step descriptions
val pgs = loadAvroPrograms(pathName)

(optPredicate, optProjection) match {
case (None, None) => {
ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs)
Expand Down Expand Up @@ -2371,6 +2316,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're calling df.as[FooProduct] to convert to Dataset, shouldn't the API method accept Dataset[FooProduct] instead? This principle is described here https://github.com/google/guice/wiki/InjectOnlyDirectDependencies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a public API, I think it's more convenient to pass a dataframe since the initial transformation is likely to be expressed in SQL or the dataframe API. Since users may call this method interactively from the shell, I think it's important to minimize friction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think exposing this as a DataFrame is fine. We're not super opinionated between DataFrames and Datasets (wherever we expose a Dataset, we expose the underlying DataFrame as well), and in the Python/R APIs, we only expose DataFrames.

val sd = loadAvroSequenceDictionary(metadataPath)
val samples = loadAvroSamples(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundVariantContextRDD(df.as[VariantContextProduct], sd, samples, headerLines)
}

/**
* Load a path name in Parquet + Avro format into a VariantContextRDD.
*
Expand All @@ -2380,21 +2332,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
*/
def loadParquetVariantContexts(
pathName: String): VariantContextRDD = {

// load header lines
val headers = loadHeaderLines(pathName)

// load sequence info
val sd = loadAvroSequenceDictionary(pathName)

// load avro record group dictionary and convert to samples
val samples = loadAvroSamples(pathName)

val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct]

new DatasetBoundVariantContextRDD(ds, sd, samples, headers)
val df = sqlContext.read.parquet(pathName)
loadVariantContexts(df, pathName)
}

/**
Expand Down Expand Up @@ -2986,6 +2926,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadFeatures(df: DataFrame, metadataPath: String): FeatureRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
DatasetBoundFeatureRDD(df.as[FeatureProduct], sd)
}

/**
* Load reference sequences into a broadcastable ReferenceFile.
*
Expand Down Expand Up @@ -3085,6 +3030,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadContigFragments(df: DataFrame, metadataPath: String): NucleotideContigFragmentRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
DatasetBoundNucleotideContigFragmentRDD(df.as[NucleotideContigFragmentProduct], sd)
}

/**
* Load genotypes into a GenotypeRDD.
*
Expand Down Expand Up @@ -3120,6 +3070,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadGenotypes(df: DataFrame, metadataPath: String): GenotypeRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val samples = loadAvroSamples(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundGenotypeRDD(df.as[GenotypeProduct], sd, samples, headerLines)
}

/**
* Load variants into a VariantRDD.
*
Expand Down Expand Up @@ -3154,6 +3111,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadVariants(df: DataFrame, metadataPath: String): VariantRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundVariantRDD(df.as[VariantProduct], sd, headerLines)
}

/**
* Load alignment records into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -3224,6 +3187,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val rgd = loadAvroRecordGroupDictionary(metadataPath)
val process = loadAvroPrograms(metadataPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process → processingSteps

new DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sd, rgd, process)
}

/**
* Load fragments into a FragmentRDD.
*
Expand Down Expand Up @@ -3282,6 +3252,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadFragments(df: DataFrame, metadataPath: String): FragmentRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val rgd = loadAvroRecordGroupDictionary(metadataPath)
val processingSteps = loadAvroPrograms(metadataPath)
DatasetBoundFragmentRDD(df.as[FragmentProduct], sd, rgd, processingSteps)
}

/**
* Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@
*/
package org.bdgenomics.adam.rdd

import htsjdk.samtools.{
SAMFormatException,
SAMProgramRecord,
ValidationStringency
}
import htsjdk.samtools.{ SAMFormatException, SAMProgramRecord, ValidationStringency }
import java.io.{ File, FileNotFoundException }

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.adam.sql.{ VariantContext => VCProduct }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style: don't add unnecessary abbreviations, especially in class names

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.CRAMInputFormat
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
Expand Down Expand Up @@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(secondPg.head.getCommandLine === "\"myProg 456\"")
assert(secondPg.head.getVersion === "1.0.0")
}

sparkTest("load variant contexts from dataframe") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fnothaft This test (and it's equivalent form where we reload using the loadVariantContexts(path: String) method) don't pass against my PR or master, which surprised me. Any ideas why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look into the failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is failing; I took the test output and pasted the dump of all of the records in each array and they have the same textual values. You could either be getting bitten by some odd floating point comparison bug, or perhaps the Array== comparator does something wonky.

val path = testFile("small.vcf")
val vcs = sc.loadVcf(path)
val outputDir = tmpLocation()
vcs.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadVariantContexts(df, outputDir)
assert(reloaded.sequences == vcs.sequences)
assert(reloaded.headerLines.toSet == vcs.headerLines.toSet)
assert(reloaded.samples == vcs.samples)
assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep ==
vcs.rdd.collect().map(VCProduct.fromModel).deep)
}

sparkTest("load features from dataframe") {
val path = testFile("dvl1.200.bed")
val features = sc.loadFeatures(path)
val outputDir = tmpLocation()
features.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadFeatures(df, outputDir)
assert(reloaded.sequences == features.sequences)
assert(reloaded.rdd.collect().deep == features.rdd.collect().deep)
}

sparkTest("load contig fragments from dataframe") {
val inputPath = testFile("chr20.250k.fa.gz")
val contigFragments = sc.loadFasta(inputPath, 10000L)
val outputDir = tmpLocation()
contigFragments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadContigFragments(df, outputDir)
assert(reloaded.sequences == contigFragments.sequences)
assert(reloaded.rdd.collect().deep == contigFragments.rdd.collect().deep)
}

sparkTest("load genotypes from dataframe") {
val path = testFile("small.vcf")
val gts = sc.loadGenotypes(path)
val outputDir = tmpLocation()
gts.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadGenotypes(df, outputDir)
assert(reloaded.sequences == gts.sequences)
assert(reloaded.headerLines.toSet == gts.headerLines.toSet)
assert(reloaded.samples == gts.samples)
assert(reloaded.rdd.collect().deep == gts.rdd.collect().deep)
}

sparkTest("load variants from dataframe") {
val path = testFile("gvcf_dir/gvcf_multiallelic.g.vcf")
val variants = sc.loadVcf(path).toVariants()
val outputDir = tmpLocation()
variants.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadVariants(df, outputDir)
assert(reloaded.sequences == variants.sequences)
assert(reloaded.headerLines.toSet == variants.headerLines.toSet)
assert(reloaded.rdd.collect().deep == variants.rdd.collect().deep)
}

sparkTest("load alignments from dataframe") {
val path = testFile("bqsr1-r1.fq")
val alignments = sc.loadAlignments(path)
val outputDir = tmpLocation()
alignments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadAlignments(df, outputDir)
assert(reloaded.sequences == alignments.sequences)
assert(reloaded.recordGroups == alignments.recordGroups)
assert(reloaded.processingSteps == alignments.processingSteps)
assert(reloaded.rdd.collect().deep == alignments.rdd.collect().deep)
}

sparkTest("load fragments from dataframe") {
val path = testFile("sample1.query.sam")
val fragments = sc.loadFragments(path)
val outputDir = tmpLocation()
fragments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadFragments(df, outputDir)
assert(reloaded.sequences == fragments.sequences)
assert(reloaded.recordGroups == fragments.recordGroups)
assert(reloaded.processingSteps == fragments.processingSteps)
assert(reloaded.rdd.collect().deep == fragments.rdd.collect().deep)
}
}