-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run MD+BQSR+HC pipeline on full genome #3106
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3106 +/- ##
===============================================
- Coverage 80.521% 80.513% -0.008%
- Complexity 17607 17927 +320
===============================================
Files 1176 1181 +5
Lines 63601 64741 +1140
Branches 9900 10205 +305
===============================================
+ Hits 51212 52125 +913
- Misses 8434 8645 +211
- Partials 3955 3971 +16
|
f11fe47
to
ccb1c2f
Compare
@tomwhite Could you rebase this branch onto the latest master? Should this be merged for beta? |
ccb1c2f
to
1d1ed2a
Compare
1d1ed2a
to
9df50e7
Compare
@droazen done. Thanks for committing the other changes. |
de71f3a
to
aadfacb
Compare
Change JoinStrategy.BROADCAST to use KnownSitesCache so it can work with known sites variants larger than 2GB. Sort reads by coordinate before running HaplotypeCaller.
8975e57
to
68a6566
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomwhite I have a few minor comments about structure and naming, as well as a few questions about things I didn't quite understand. Looks good overall to me though.
. utils.sh | ||
|
||
time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g | ||
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we need ${GCS_CLUSTER}-m
in some of the paths but not all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bug somewhere, whereby the port is not being picked up. I've opened #3468 to address it.
* | ||
* @param downsampler downsampler to use (may be null, which signifies that no downsampling is to be performed) | ||
*/ | ||
public void setDownsampler(final ReadsDownsampler downsampler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would get rid of this method and just put it in the constructor. I can't see a reason you can't, but I might be missing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
final JavaRDD<VariantContext> variants = callVariantsWithHaplotypeCaller(getAuthHolder(), ctx, getReads(), getHeaderForReads(), getReference(), intervals, hcArgs, shardingArgs); | ||
// Reads must be coordinate sorted to use the overlaps partitioner | ||
final SAMFileHeader readsHeader = getHeaderForReads().clone(); | ||
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why set it and then sort it? It seems like maybe we should check if it's coordinate sorted, and then sort it if it isn't. I'm probably missing some detail about this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes absolutely sure it's sorted - there may be an issue with sharded input not actually being sorted, even though the headers says it is. There's a follow-up issue to check and document sorting requirements: #3308.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're going to want an option to override this at some point for performance reasons, but we can do that as a later upgrade.
// Reads must be coordinate sorted to use the overlaps partitioner | ||
final SAMFileHeader readsHeader = getHeaderForReads().clone(); | ||
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate); | ||
JavaRDD<GATKRead> coordinateSortedReads = SparkUtils.coordinateSortReads(getReads(), readsHeader, numReducers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
final List<ShardBoundary> shardBoundaries = getShardBoundaries(header, intervals, shardingArgs.readShardSize, shardingArgs.readShardPadding); | ||
|
||
final int maxLocatableSize = reads.map(r -> r.getEnd() - r.getStart() + 1).reduce(Math::max); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an expensive operation. Is it a safety test for crazy reads that usually don't happen or is it always necessary?
Also, lets name this something more specific, like maxReadLength
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's a safety test, as I found that there were sometimes reads that were longer than expected. It's not particularly expensive in the sense that it's a small fraction of the total runtime, even though it's a scan over the full data set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want a mechanism to skip this for performance purposes at some point, but we can wait on that I think.
@@ -148,6 +148,16 @@ public HaplotypeCallerEngine( final HaplotypeCallerArgumentCollection hcArgs, b | |||
initialize(createBamOutIndex, createBamOutMD5); | |||
} | |||
|
|||
public HaplotypeCallerEngine( final HaplotypeCallerArgumentCollection hcArgs, boolean createBamOutIndex, boolean createBamOutMD5, final SAMFileHeader readsHeader, ReferenceSequenceFile referenceReader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old constructor should just call through to this one now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
}); | ||
final JavaPairRDD<ShardBoundary, Iterable<GATKRead>> shardsWithReads = paired.groupByKey(); | ||
return shardsWithReads.map(shard -> new SparkReadShard(shard._1(), shard._2())); | ||
private static JavaRDD<Shard<GATKRead>> createReadShards(final JavaSparkContext ctx, final List<ShardBoundary> shardBoundaries, final JavaRDD<GATKRead> reads, final SAMFileHeader header, int maxLocatableSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could just inline this now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -267,13 +293,24 @@ private void writeVariants(JavaRDD<VariantContext> variants) { | |||
final Broadcast<ReferenceMultiSource> reference, | |||
final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast, | |||
final ShardingArgumentCollection assemblyArgs, | |||
final SAMFileHeader header) { | |||
final SAMFileHeader header, | |||
final Broadcast<VariantAnnotatorEngine> annotatorEngineBroadcast) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember, is the reason we don't pass the engine in as a broadcast here because you cant serialize / broadcast the reference source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried broadcasting the engine, but it's not threadsafe, and I hit a lot of issues because of this. The reason to broadcast the VariantAnnotatorEngine
is because it scans the classpath, which makes it expensive to create anew on each executor - so broadcasting makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that basically what I figured but I just wanted to hear the explanation to be sure.
return iteratorToStream(shards).flatMap(shardToRegion(assemblyArgs, header, referenceSource, hcEngine)).iterator(); | ||
final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue()); | ||
|
||
ReadsDownsampler readsDownsampler = assemblyArgs.maxReadsPerAlignmentStart > 0 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -21,13 +22,13 @@ | |||
public final class BroadcastJoinReadsWithVariants { | |||
private BroadcastJoinReadsWithVariants(){} | |||
|
|||
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants ) { | |||
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants, final List<String> variantsPaths ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like we should have two different methods here, one that takes JavaRDD<GATKVariant>
and one that takes List<String>
, instead of having one method that takes one or more of them.
Also, this method needs javadoc (not your fault that its missing, but would you mind adding it?), particularly if it can take two mutually exclusive inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@tomwhite Back to you. |
Thanks for the review @lbergelson. I've addressed the changes you asked for. Back to you. |
@tomwhite Looks good to me! 👍 |
@lbergelson thanks! |
Here are the changes needed to get the full pipeline running on WGS BAMs.
This contains commits from other PRs.