-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ADAMContext APIs to create genomic RDDs from dataframes #2000
Conversation
Can one of the admins verify this patch? |
@@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite { | |||
assert(secondPg.head.getCommandLine === "\"myProg 456\"") | |||
assert(secondPg.head.getVersion === "1.0.0") | |||
} | |||
|
|||
sparkTest("load variant contexts from dataframe") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fnothaft This test (and it's equivalent form where we reload using the loadVariantContexts(path: String)
method) don't pass against my PR or master, which surprised me. Any ideas why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look into the failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this is failing; I took the test output and pasted the dump of all of the records in each array and they have the same textual values. You could either be getting bitten by some odd floating point comparison bug, or perhaps the Array== comparator does something wonky.
Thanks for the contribution, @henrydavidge! |
Jenkins, test this please |
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/2000/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 82e03c9 # timeout=10Checking out Revision 82e03c9 (origin/pr/2000/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 82e03c9455cd8a8cdbe1599c119a7401e4856adeFirst time build. Skipping changelog.Triggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some code style suggestions and a few questions.
VCFHeaderLine, | ||
VCFInfoHeaderLine | ||
} | ||
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code style: we format this style of import to separate lines if there are more than three items or if the line gets too long. Same goes for the other import formatting changes.
@@ -2371,6 +2318,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
} | |||
} | |||
|
|||
def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're calling df.as[FooProduct]
to convert to Dataset
, shouldn't the API method accept Dataset[FooProduct]
instead? This principle is described here https://github.com/google/guice/wiki/InjectOnlyDirectDependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a public API, I think it's more convenient to pass a dataframe since the initial transformation is likely to be expressed in SQL or the dataframe API. Since users may call this method interactively from the shell, I think it's important to minimize friction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think exposing this as a DataFrame is fine. We're not super opinionated between DataFrames and Datasets (wherever we expose a Dataset, we expose the underlying DataFrame as well), and in the Python/R APIs, we only expose DataFrames.
import org.bdgenomics.adam.models._ | ||
import org.bdgenomics.adam.rdd.ADAMContext._ | ||
import org.bdgenomics.adam.util.PhredUtils._ | ||
import org.bdgenomics.adam.util.ADAMFunSuite | ||
import org.bdgenomics.adam.sql.{ VariantContext => VCProduct } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code style: don't add unnecessary abbreviations, especially in class names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = { | ||
val sd = loadAvroSequenceDictionary(metadataPath) | ||
val rgd = loadAvroRecordGroupDictionary(metadataPath) | ||
val process = loadAvroPrograms(metadataPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process → processingSteps
@@ -1121,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter | |||
* @param sc The SparkContext to wrap. | |||
*/ | |||
class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { | |||
@transient val spark = SQLContext.getOrCreate(sc).sparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't appear to be used anywhere, except for the import below. Why bring that up here instead of keeping it as import sqlContext.implicits._
within a method as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I thought it was a reasonable thing to have available, and it saved me from importing the implicitits in each of the new functions I added. I'm fine with moving it if you prefer, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it eliminates the need to repeatedly import the implicits, then I'd favor keeping it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, a few small comments. Would you be able to add this to the Python and R APIs as well?
@@ -18,15 +18,10 @@ | |||
package org.bdgenomics.adam.rdd | |||
|
|||
import java.io.{ File, FileNotFoundException, InputStream } | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no space after import.
@@ -1121,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter | |||
* @param sc The SparkContext to wrap. | |||
*/ | |||
class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { | |||
@transient val spark = SQLContext.getOrCreate(sc).sparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it eliminates the need to repeatedly import the implicits, then I'd favor keeping it.
@@ -2371,6 +2318,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
} | |||
} | |||
|
|||
def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think exposing this as a DataFrame is fine. We're not super opinionated between DataFrames and Datasets (wherever we expose a Dataset, we expose the underlying DataFrame as well), and in the Python/R APIs, we only expose DataFrames.
import org.bdgenomics.adam.models._ | ||
import org.bdgenomics.adam.rdd.ADAMContext._ | ||
import org.bdgenomics.adam.util.PhredUtils._ | ||
import org.bdgenomics.adam.util.ADAMFunSuite | ||
import org.bdgenomics.adam.sql.{ VariantContext => VCProduct } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite { | |||
assert(secondPg.head.getCommandLine === "\"myProg 456\"") | |||
assert(secondPg.head.getVersion === "1.0.0") | |||
} | |||
|
|||
sparkTest("load variant contexts from dataframe") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look into the failure.
Closing in favor of #2158 |
In some cases, it's convenient to use vanilla Spark APIs to load genomic data and apply some basic transformations before creating an ADAM object. This PR adds methods to
ADAMContext
to load each type of genomic RDD from a Spark SQL dataframe and a metadata path. We look for metadata like the sequence and record group dictionaries in the metadata path.