-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run MD+BQSR+HC pipeline on full genome #3106
Changes from 3 commits
07503e6
d5513cf
68a6566
9c8baed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
#!/usr/bin/env bash | ||
|
||
# Download all required data for small BAM and store in HDFS. | ||
|
||
TARGET_DIR=${1:-small_spark_eval} | ||
|
||
hadoop fs -stat $TARGET_DIR > /dev/null 2>&1 | ||
if [ $? -eq 0 ]; then | ||
echo "$TARGET_DIR already exists. Delete it and try again." | ||
exit 1 | ||
fi | ||
|
||
set -e | ||
set -x | ||
|
||
# Create data directory in HDFS | ||
hadoop fs -mkdir -p $TARGET_DIR | ||
|
||
# Download exome BAM | ||
gsutil cp gs://hellbender/test/resources/large/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam - | hadoop fs -put - $TARGET_DIR/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam | ||
gsutil cp gs://hellbender/test/resources/large/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam.bai - | hadoop fs -put - $TARGET_DIR/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam.bai | ||
|
||
# Download reference | ||
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.2bit - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.2bit | ||
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.dict - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.dict | ||
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.fasta.fai - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.fasta.fai | ||
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.fasta - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.fasta | ||
|
||
# Download known sites VCF | ||
gsutil cp gs://hellbender/test/resources/large/dbsnp_138.b37.20.21.vcf - | hadoop fs -put - $TARGET_DIR/dbsnp_138.b37.20.21.vcf | ||
|
||
# List data | ||
hadoop fs -ls -h $TARGET_DIR |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
#!/usr/bin/env bash | ||
|
||
# Run the pipeline (Mark Duplicates, BQSR, Haplotype Caller) on small data on a Spark cluster. | ||
|
||
. utils.sh | ||
|
||
time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g | ||
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs:///user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g | ||
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit -O hdfs:///user/$USER/small_spark_eval/out/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.vcf -pairHMM AVX_LOGLESS_CACHING" 8 1 4g 4g |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
#!/usr/bin/env bash | ||
|
||
# Run the pipeline (Mark Duplicates, BQSR, Haplotype Caller) on small data on a GCS Dataproc cluster. Data is in HDFS. | ||
|
||
. utils.sh | ||
|
||
time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g | ||
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g | ||
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/out/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.vcf -pairHMM AVX_LOGLESS_CACHING" 8 1 4g 4g |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
import scala.Tuple2; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
/** | ||
* Joins an RDD of GATKReads to variant data using a broadcast strategy. | ||
|
@@ -21,13 +22,13 @@ | |
public final class BroadcastJoinReadsWithVariants { | ||
private BroadcastJoinReadsWithVariants(){} | ||
|
||
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants ) { | ||
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants, final List<String> variantsPaths ) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like we should have two different methods here, one that takes Also, this method needs javadoc (not your fault that its missing, but would you mind adding it?), particularly if it can take two mutually exclusive inputs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
final JavaSparkContext ctx = new JavaSparkContext(reads.context()); | ||
final IntervalsSkipList<GATKVariant> variantSkipList = new IntervalsSkipList<>(variants.collect()); | ||
final Broadcast<IntervalsSkipList<GATKVariant>> variantsBroadcast = ctx.broadcast(variantSkipList); | ||
final Broadcast<IntervalsSkipList<GATKVariant>> variantsBroadcast = variantsPaths == null ? ctx.broadcast(new IntervalsSkipList<>(variants.collect())) : null; | ||
|
||
return reads.mapToPair(r -> { | ||
final IntervalsSkipList<GATKVariant> intervalsSkipList = variantsBroadcast.getValue(); | ||
final IntervalsSkipList<GATKVariant> intervalsSkipList = variantsPaths == null ? variantsBroadcast.getValue() : | ||
KnownSitesCache.getVariants(variantsPaths); | ||
if (SimpleInterval.isValid(r.getContig(), r.getStart(), r.getEnd())) { | ||
return new Tuple2<>(r, intervalsSkipList.getOverlapping(new SimpleInterval(r))); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package org.broadinstitute.hellbender.tools; | ||
|
||
|
||
import java.io.Serializable; | ||
import java.util.Iterator; | ||
import org.broadinstitute.hellbender.engine.Shard; | ||
import org.broadinstitute.hellbender.engine.ShardBoundary; | ||
import org.broadinstitute.hellbender.utils.SimpleInterval; | ||
import org.broadinstitute.hellbender.utils.Utils; | ||
import org.broadinstitute.hellbender.utils.downsampling.ReadsDownsampler; | ||
import org.broadinstitute.hellbender.utils.downsampling.ReadsDownsamplingIterator; | ||
import org.broadinstitute.hellbender.utils.read.GATKRead; | ||
|
||
/** | ||
* A simple shard implementation intended to be used for splitting reads by partition in Spark tools | ||
*/ | ||
public final class DownsampleableSparkReadShard implements Shard<GATKRead>, Serializable { | ||
private static final long serialVersionUID = 1L; | ||
|
||
private final ShardBoundary boundaries; | ||
private final Iterable<GATKRead> reads; | ||
private ReadsDownsampler downsampler; | ||
|
||
public DownsampleableSparkReadShard(final ShardBoundary boundaries, final Iterable<GATKRead> reads){ | ||
this.boundaries = Utils.nonNull(boundaries); | ||
this.reads = Utils.nonNull(reads); | ||
} | ||
|
||
/** | ||
* Reads in this shard will be downsampled using this downsampler before being returned. | ||
* | ||
* @param downsampler downsampler to use (may be null, which signifies that no downsampling is to be performed) | ||
*/ | ||
public void setDownsampler(final ReadsDownsampler downsampler) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would get rid of this method and just put it in the constructor. I can't see a reason you can't, but I might be missing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
this.downsampler = downsampler; | ||
} | ||
|
||
@Override | ||
public SimpleInterval getInterval() { | ||
return boundaries.getInterval(); | ||
} | ||
|
||
@Override | ||
public SimpleInterval getPaddedInterval() { | ||
return boundaries.getPaddedInterval(); | ||
} | ||
|
||
@Override | ||
public Iterator<GATKRead> iterator() { | ||
Iterator<GATKRead> readsIterator = reads.iterator(); | ||
|
||
if ( downsampler != null ) { | ||
readsIterator = new ReadsDownsamplingIterator(readsIterator, downsampler); | ||
} | ||
|
||
return readsIterator; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we need
${GCS_CLUSTER}-m
in some of the paths but not all of them.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bug somewhere, whereby the port is not being picked up. I've opened #3468 to address it.