Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run MD+BQSR+HC pipeline on full genome #3106

Merged
merged 4 commits into from
Aug 22, 2017
Merged

Conversation

tomwhite
Copy link
Contributor

Here are the changes needed to get the full pipeline running on WGS BAMs.

This contains commits from other PRs.

@codecov-io
Copy link

codecov-io commented Jun 13, 2017

Codecov Report

Merging #3106 into master will decrease coverage by 0.008%.
The diff coverage is 88.235%.

@@               Coverage Diff               @@
##              master     #3106       +/-   ##
===============================================
- Coverage     80.521%   80.513%   -0.008%     
- Complexity     17607     17927      +320     
===============================================
  Files           1176      1181        +5     
  Lines          63601     64741     +1140     
  Branches        9900     10205      +305     
===============================================
+ Hits           51212     52125      +913     
- Misses          8434      8645      +211     
- Partials        3955      3971       +16
Impacted Files Coverage Δ Complexity Δ
...bender/engine/spark/AddContextDataToReadSpark.java 72.727% <0%> (-3.03%) 4 <0> (ø)
...walkers/haplotypecaller/HaplotypeCallerEngine.java 72.519% <100%> (+0.426%) 54 <1> (+2) ⬆️
...r/engine/spark/BroadcastJoinReadsWithVariants.java 62.5% <66.667%> (-4.167%) 4 <3> (+2)
...hellbender/tools/DownsampleableSparkReadShard.java 90.909% <90.909%> (ø) 4 <4> (?)
...stitute/hellbender/tools/HaplotypeCallerSpark.java 79.048% <92.593%> (-4.626%) 24 <7> (-3)
...s/spark/sv/discovery/AnnotatedVariantProducer.java 91.667% <0%> (-1.316%) 18% <0%> (+2%)
...ls/spark/sv/discovery/GappedAlignmentSplitter.java 94.783% <0%> (-0.62%) 33% <0%> (ø)
...oadinstitute/hellbender/utils/gcs/BucketUtils.java 75.926% <0%> (-0.39%) 41% <0%> (+3%)
...ute/hellbender/tools/spark/utils/IntHistogram.java 89.706% <0%> (-0.125%) 19% <0%> (ø)
... and 31 more

@tomwhite tomwhite force-pushed the tw_hc_genome_fixes branch from f11fe47 to ccb1c2f Compare June 20, 2017 08:15
@droazen droazen self-requested a review June 20, 2017 12:49
@droazen droazen self-assigned this Jun 20, 2017
@droazen
Copy link
Contributor

droazen commented Jun 20, 2017

@tomwhite Could you rebase this branch onto the latest master? Should this be merged for beta?

@tomwhite tomwhite force-pushed the tw_hc_genome_fixes branch from ccb1c2f to 1d1ed2a Compare June 20, 2017 17:52
@tomwhite
Copy link
Contributor Author

@droazen I've rebased it now. This is not necessary for beta (although no problem if it does get in), but these doc changes should go in: #3132.

@droazen
Copy link
Contributor

droazen commented Jun 20, 2017

@tomwhite Mind rebasing this one again now that #3132 is merged?

@tomwhite tomwhite force-pushed the tw_hc_genome_fixes branch from 1d1ed2a to 9df50e7 Compare June 21, 2017 08:32
@tomwhite
Copy link
Contributor Author

@droazen done. Thanks for committing the other changes.

@tomwhite tomwhite force-pushed the tw_hc_genome_fixes branch 2 times, most recently from de71f3a to aadfacb Compare July 24, 2017 14:47
@droazen droazen requested review from lbergelson and removed request for droazen July 24, 2017 16:35
@droazen droazen assigned lbergelson and unassigned droazen Jul 24, 2017
Change JoinStrategy.BROADCAST to use KnownSitesCache so it can work with known sites variants larger than 2GB.

Sort reads by coordinate before running HaplotypeCaller.
Copy link
Member

@lbergelson lbergelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomwhite I have a few minor comments about structure and naming, as well as a few questions about things I didn't quite understand. Looks good overall to me though.

. utils.sh

time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need ${GCS_CLUSTER}-m in some of the paths but not all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug somewhere, whereby the port is not being picked up. I've opened #3468 to address it.

*
* @param downsampler downsampler to use (may be null, which signifies that no downsampling is to be performed)
*/
public void setDownsampler(final ReadsDownsampler downsampler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would get rid of this method and just put it in the constructor. I can't see a reason you can't, but I might be missing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final JavaRDD<VariantContext> variants = callVariantsWithHaplotypeCaller(getAuthHolder(), ctx, getReads(), getHeaderForReads(), getReference(), intervals, hcArgs, shardingArgs);
// Reads must be coordinate sorted to use the overlaps partitioner
final SAMFileHeader readsHeader = getHeaderForReads().clone();
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set it and then sort it? It seems like maybe we should check if it's coordinate sorted, and then sort it if it isn't. I'm probably missing some detail about this though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes absolutely sure it's sorted - there may be an issue with sharded input not actually being sorted, even though the headers says it is. There's a follow-up issue to check and document sorting requirements: #3308.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're going to want an option to override this at some point for performance reasons, but we can do that as a later upgrade.

// Reads must be coordinate sorted to use the overlaps partitioner
final SAMFileHeader readsHeader = getHeaderForReads().clone();
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate);
JavaRDD<GATKRead> coordinateSortedReads = SparkUtils.coordinateSortReads(getReads(), readsHeader, numReducers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


final List<ShardBoundary> shardBoundaries = getShardBoundaries(header, intervals, shardingArgs.readShardSize, shardingArgs.readShardPadding);

final int maxLocatableSize = reads.map(r -> r.getEnd() - r.getStart() + 1).reduce(Math::max);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an expensive operation. Is it a safety test for crazy reads that usually don't happen or is it always necessary?

Also, lets name this something more specific, like maxReadLength

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a safety test, as I found that there were sometimes reads that were longer than expected. It's not particularly expensive in the sense that it's a small fraction of the total runtime, even though it's a scan over the full data set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want a mechanism to skip this for performance purposes at some point, but we can wait on that I think.

@@ -148,6 +148,16 @@ public HaplotypeCallerEngine( final HaplotypeCallerArgumentCollection hcArgs, b
initialize(createBamOutIndex, createBamOutMD5);
}

public HaplotypeCallerEngine( final HaplotypeCallerArgumentCollection hcArgs, boolean createBamOutIndex, boolean createBamOutMD5, final SAMFileHeader readsHeader, ReferenceSequenceFile referenceReader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the old constructor should just call through to this one now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

});
final JavaPairRDD<ShardBoundary, Iterable<GATKRead>> shardsWithReads = paired.groupByKey();
return shardsWithReads.map(shard -> new SparkReadShard(shard._1(), shard._2()));
private static JavaRDD<Shard<GATKRead>> createReadShards(final JavaSparkContext ctx, final List<ShardBoundary> shardBoundaries, final JavaRDD<GATKRead> reads, final SAMFileHeader header, int maxLocatableSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could just inline this now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -267,13 +293,24 @@ private void writeVariants(JavaRDD<VariantContext> variants) {
final Broadcast<ReferenceMultiSource> reference,
final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast,
final ShardingArgumentCollection assemblyArgs,
final SAMFileHeader header) {
final SAMFileHeader header,
final Broadcast<VariantAnnotatorEngine> annotatorEngineBroadcast) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember, is the reason we don't pass the engine in as a broadcast here because you cant serialize / broadcast the reference source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried broadcasting the engine, but it's not threadsafe, and I hit a lot of issues because of this. The reason to broadcast the VariantAnnotatorEngine is because it scans the classpath, which makes it expensive to create anew on each executor - so broadcasting makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that basically what I figured but I just wanted to hear the explanation to be sure.

return iteratorToStream(shards).flatMap(shardToRegion(assemblyArgs, header, referenceSource, hcEngine)).iterator();
final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue());

ReadsDownsampler readsDownsampler = assemblyArgs.maxReadsPerAlignmentStart > 0 ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -21,13 +22,13 @@
public final class BroadcastJoinReadsWithVariants {
private BroadcastJoinReadsWithVariants(){}

public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants ) {
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants, final List<String> variantsPaths ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we should have two different methods here, one that takes JavaRDD<GATKVariant> and one that takes List<String>, instead of having one method that takes one or more of them.

Also, this method needs javadoc (not your fault that its missing, but would you mind adding it?), particularly if it can take two mutually exclusive inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lbergelson
Copy link
Member

@tomwhite Back to you.

@tomwhite
Copy link
Contributor Author

Thanks for the review @lbergelson. I've addressed the changes you asked for. Back to you.

@lbergelson
Copy link
Member

@tomwhite Looks good to me! 👍

@lbergelson lbergelson merged commit 685913b into master Aug 22, 2017
@lbergelson lbergelson deleted the tw_hc_genome_fixes branch August 22, 2017 22:19
@tomwhite
Copy link
Contributor Author

@lbergelson thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants