Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run MD+BQSR+HC pipeline on full genome #3106

Merged
merged 4 commits into from
Aug 22, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/spark_eval/exome_pipeline_gcs_hdfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/exome_spark_eval/NA12878.ga2.exome.maq.raw.bam -O hdfs:///user/$USER/exome_spark_eval/out/markdups-sharded --shardedOutput true" 48 1 4g 4g
# Note need to include host/port on following line. Need to investigate how to remove.
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/exome_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/exome_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/exome_spark_eval/Homo_sapiens_assembly18.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/exome_spark_eval/dbsnp_138.hg18.vcf --joinStrategy OVERLAPS_PARTITIONER" 4 8 32g 4g
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/exome_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/exome_spark_eval/Homo_sapiens_assembly18.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/exome_spark_eval/out/NA12878.ga2.exome.maq.raw.vcf -pairHMM LOGLESS_CACHING" 48 1 4g 4g
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/exome_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/exome_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/exome_spark_eval/Homo_sapiens_assembly18.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/exome_spark_eval/dbsnp_138.hg18.vcf" 4 8 32g 4g
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/exome_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/exome_spark_eval/Homo_sapiens_assembly18.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/exome_spark_eval/out/NA12878.ga2.exome.maq.raw.vcf -pairHMM AVX_LOGLESS_CACHING -maxReadsPerAlignmentStart 10" 32 1 6g 4g
2 changes: 1 addition & 1 deletion scripts/spark_eval/genome_hc_gcs_hdfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

. utils.sh

time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/q4_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/q4_spark_eval/human_g1k_v37.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/q4_spark_eval/out/WGS-G94982-NA12878.vcf -pairHMM LOGLESS_CACHING -maxReadsPerAlignmentStart 10" 30 1 12g 8g
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/q4_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/q4_spark_eval/human_g1k_v37.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/q4_spark_eval/out/WGS-G94982-NA12878.vcf -pairHMM AVX_LOGLESS_CACHING -maxReadsPerAlignmentStart 10" 30 1 12g 8g
33 changes: 33 additions & 0 deletions scripts/spark_eval/prep_data_small_gcs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

# Download all required data for small BAM and store in HDFS.

TARGET_DIR=${1:-small_spark_eval}

hadoop fs -stat $TARGET_DIR > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "$TARGET_DIR already exists. Delete it and try again."
exit 1
fi

set -e
set -x

# Create data directory in HDFS
hadoop fs -mkdir -p $TARGET_DIR

# Download exome BAM
gsutil cp gs://hellbender/test/resources/large/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam - | hadoop fs -put - $TARGET_DIR/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam
gsutil cp gs://hellbender/test/resources/large/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam.bai - | hadoop fs -put - $TARGET_DIR/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam.bai

# Download reference
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.2bit - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.2bit
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.dict - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.dict
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.fasta.fai - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.fasta.fai
gsutil cp gs://hellbender/test/resources/large/human_g1k_v37.20.21.fasta - | hadoop fs -put - $TARGET_DIR/human_g1k_v37.20.21.fasta

# Download known sites VCF
gsutil cp gs://hellbender/test/resources/large/dbsnp_138.b37.20.21.vcf - | hadoop fs -put - $TARGET_DIR/dbsnp_138.b37.20.21.vcf

# List data
hadoop fs -ls -h $TARGET_DIR
9 changes: 9 additions & 0 deletions scripts/spark_eval/small_pipeline.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

# Run the pipeline (Mark Duplicates, BQSR, Haplotype Caller) on small data on a Spark cluster.

. utils.sh

time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs:///user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g
time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit -O hdfs:///user/$USER/small_spark_eval/out/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.vcf -pairHMM AVX_LOGLESS_CACHING" 8 1 4g 4g
9 changes: 9 additions & 0 deletions scripts/spark_eval/small_pipeline_gcs_hdfs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

# Run the pipeline (Mark Duplicates, BQSR, Haplotype Caller) on small data on a GCS Dataproc cluster. Data is in HDFS.

. utils.sh

time_gatk "MarkDuplicatesSpark -I hdfs:///user/$USER/small_spark_eval/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.bam -O hdfs:///user/$USER/small_spark_eval/out/markdups-sharded --shardedOutput true" 8 1 4g 4g
time_gatk "BQSRPipelineSpark -I hdfs:///user/$USER/small_spark_eval/out/markdups-sharded -O hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded --shardedOutput true -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit --knownSites hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/dbsnp_138.b37.20.21.vcf -L 20:10000000-10100000" 1 8 32g 4g
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need ${GCS_CLUSTER}-m in some of the paths but not all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug somewhere, whereby the port is not being picked up. I've opened #3468 to address it.

time_gatk "HaplotypeCallerSpark -I hdfs:///user/$USER/small_spark_eval/out/bqsr-sharded -R hdfs:///user/$USER/small_spark_eval/human_g1k_v37.20.21.2bit -O hdfs://${GCS_CLUSTER}-m:8020/user/$USER/small_spark_eval/out/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.vcf -pairHMM AVX_LOGLESS_CACHING" 8 1 4g 4g
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static JavaPairRDD<GATKRead, ReadContextData> add(
JavaPairRDD<GATKRead, Tuple2<Iterable<GATKVariant>, ReferenceBases>> withVariantsWithRef;
if (joinStrategy.equals(JoinStrategy.BROADCAST)) {
// Join Reads and Variants
JavaPairRDD<GATKRead, Iterable<GATKVariant>> withVariants = BroadcastJoinReadsWithVariants.join(mappedReads, variants);
JavaPairRDD<GATKRead, Iterable<GATKVariant>> withVariants = BroadcastJoinReadsWithVariants.join(mappedReads, variants, variantsPaths);
// Join Reads with ReferenceBases
withVariantsWithRef = BroadcastJoinReadsWithRefBases.addBases(referenceSource, withVariants);
} else if (joinStrategy.equals(JoinStrategy.SHUFFLE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import scala.Tuple2;

import java.util.Collections;
import java.util.List;

/**
* Joins an RDD of GATKReads to variant data using a broadcast strategy.
Expand All @@ -21,13 +22,13 @@
public final class BroadcastJoinReadsWithVariants {
private BroadcastJoinReadsWithVariants(){}

public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants ) {
public static JavaPairRDD<GATKRead, Iterable<GATKVariant>> join(final JavaRDD<GATKRead> reads, final JavaRDD<GATKVariant> variants, final List<String> variantsPaths ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we should have two different methods here, one that takes JavaRDD<GATKVariant> and one that takes List<String>, instead of having one method that takes one or more of them.

Also, this method needs javadoc (not your fault that its missing, but would you mind adding it?), particularly if it can take two mutually exclusive inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final JavaSparkContext ctx = new JavaSparkContext(reads.context());
final IntervalsSkipList<GATKVariant> variantSkipList = new IntervalsSkipList<>(variants.collect());
final Broadcast<IntervalsSkipList<GATKVariant>> variantsBroadcast = ctx.broadcast(variantSkipList);
final Broadcast<IntervalsSkipList<GATKVariant>> variantsBroadcast = variantsPaths == null ? ctx.broadcast(new IntervalsSkipList<>(variants.collect())) : null;

return reads.mapToPair(r -> {
final IntervalsSkipList<GATKVariant> intervalsSkipList = variantsBroadcast.getValue();
final IntervalsSkipList<GATKVariant> intervalsSkipList = variantsPaths == null ? variantsBroadcast.getValue() :
KnownSitesCache.getVariants(variantsPaths);
if (SimpleInterval.isValid(r.getContig(), r.getStart(), r.getEnd())) {
return new Tuple2<>(r, intervalsSkipList.getOverlapping(new SimpleInterval(r)));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.broadinstitute.hellbender.tools;


import java.io.Serializable;
import java.util.Iterator;
import org.broadinstitute.hellbender.engine.Shard;
import org.broadinstitute.hellbender.engine.ShardBoundary;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.downsampling.ReadsDownsampler;
import org.broadinstitute.hellbender.utils.downsampling.ReadsDownsamplingIterator;
import org.broadinstitute.hellbender.utils.read.GATKRead;

/**
* A simple shard implementation intended to be used for splitting reads by partition in Spark tools
*/
public final class DownsampleableSparkReadShard implements Shard<GATKRead>, Serializable {
private static final long serialVersionUID = 1L;

private final ShardBoundary boundaries;
private final Iterable<GATKRead> reads;
private ReadsDownsampler downsampler;

public DownsampleableSparkReadShard(final ShardBoundary boundaries, final Iterable<GATKRead> reads){
this.boundaries = Utils.nonNull(boundaries);
this.reads = Utils.nonNull(reads);
}

/**
* Reads in this shard will be downsampled using this downsampler before being returned.
*
* @param downsampler downsampler to use (may be null, which signifies that no downsampling is to be performed)
*/
public void setDownsampler(final ReadsDownsampler downsampler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would get rid of this method and just put it in the constructor. I can't see a reason you can't, but I might be missing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.downsampler = downsampler;
}

@Override
public SimpleInterval getInterval() {
return boundaries.getInterval();
}

@Override
public SimpleInterval getPaddedInterval() {
return boundaries.getPaddedInterval();
}

@Override
public Iterator<GATKRead> iterator() {
Iterator<GATKRead> readsIterator = reads.iterator();

if ( downsampler != null ) {
readsIterator = new ReadsDownsamplingIterator(readsIterator, downsampler);
}

return readsIterator;
}
}
Loading