From 3185e4c3fc2558623e80ce8f2379548a31f5bfdb Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 24 Sep 2015 22:09:45 +0000 Subject: [PATCH] add helper for loading paired fastq files --- .../org/bdgenomics/adam/cli/Transform.scala | 9 ++-- .../converters/FastqRecordConverter.scala | 10 ++-- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 51 ++++++++++++++++--- .../read/AlignmentRecordRDDFunctions.scala | 1 - 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala index b039bb3b94..be34b523ae 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala @@ -84,6 +84,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { var forceLoadParquet: Boolean = false @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file") var asSingleFile: Boolean = false + @Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.") + var pairedFastqFile: String = null } class Transform(protected val args: TransformArgs) extends BDGSparkCommand[TransformArgs] with Logging { @@ -110,8 +112,9 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans .fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])( new ConsensusGeneratorFromKnowns(_, sc).asInstanceOf[ConsensusGenerator]) - adamRecords = adamRecords.adamRealignIndels(consensusGenerator, - false, + adamRecords = adamRecords.adamRealignIndels( + consensusGenerator, + isSorted = false, args.maxIndelSize, args.maxConsensusNumber, args.lodThreshold, @@ -150,7 +153,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans if (args.forceLoadBam) { sc.loadBam(args.inputPath) } else if (args.forceLoadFastq) { - sc.loadUnpairedFastq(args.inputPath) + sc.loadFastq(args.inputPath, Option(args.pairedFastqFile)) } else if (args.forceLoadIFastq) { sc.loadInterleavedFastq(args.inputPath) } else if (args.forceLoadParquet) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastqRecordConverter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastqRecordConverter.scala index 1860207893..6479fd74e7 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastqRecordConverter.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastqRecordConverter.scala @@ -74,7 +74,9 @@ class FastqRecordConverter extends Serializable with Logging { .build()) } - def convertRead(element: (Void, Text)): AlignmentRecord = { + def convertRead(element: (Void, Text), + setFirstOfPair: Boolean = false, + setSecondOfPair: Boolean = false): AlignmentRecord = { val lines = element._2.toString.split('\n') assert(lines.length == 4, "Record has wrong format:\n" + element._2.toString) @@ -90,10 +92,10 @@ class FastqRecordConverter extends Serializable with Logging { .setReadName(readName) .setSequence(readSequence) .setQual(readQualities) - .setReadPaired(false) + .setReadPaired(setFirstOfPair || setSecondOfPair) .setProperPair(null) - .setFirstOfPair(null) - .setSecondOfPair(null) + .setFirstOfPair(setFirstOfPair) + .setSecondOfPair(setSecondOfPair) .setReadNegativeStrand(null) .setMateNegativeStrand(null) .setPrimaryAlignment(null) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 346cab34a9..26218b2959 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -19,7 +19,7 @@ package org.bdgenomics.adam.rdd import java.io.FileNotFoundException import java.util.regex.Pattern -import htsjdk.samtools.SAMFileHeader +import htsjdk.samtools.{ ValidationStringency, SAMFileHeader, IndexedBamInputFormat } import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.avro.specific.SpecificRecord @@ -50,7 +50,6 @@ import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.parquet.hadoop.util.ContextUtil import scala.collection.JavaConversions._ import scala.collection.Map -import htsjdk.samtools.IndexedBamInputFormat object ADAMContext { // Add ADAM Spark context methods @@ -364,8 +363,46 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging { records.flatMap(fastqRecordConverter.convertPair) } - def loadUnpairedFastq( - filePath: String): RDD[AlignmentRecord] = { + def loadFastq(filePath1: String, + filePath2Opt: Option[String], + stringency: ValidationStringency = ValidationStringency.STRICT): RDD[AlignmentRecord] = { + filePath2Opt match { + case Some(filePath2) => loadPairedFastq(filePath1, filePath2, stringency) + case None => loadUnpairedFastq(filePath1) + } + } + + def loadPairedFastq(filePath1: String, + filePath2: String, + stringency: ValidationStringency): RDD[AlignmentRecord] = { + val reads1 = loadUnpairedFastq(filePath1, setFirstOfPair = true) + val reads2 = loadUnpairedFastq(filePath2, setSecondOfPair = true) + + def zipped = reads1.zip(reads2) + + stringency match { + case ValidationStringency.STRICT | ValidationStringency.LENIENT => + val count1 = reads1.cache.count + val count2 = reads2.cache.count + + if (count1 != count2) { + val msg = s"Fastq 1 ($filePath1) has $count1 reads, fastq 2 ($filePath2) has $count2 reads" + if (stringency == ValidationStringency.STRICT) + throw new IllegalArgumentException(msg) + else { + // ValidationStringency.LENIENT + logError(msg) + } + } + case ValidationStringency.SILENT => + } + + reads1 ++ reads2 + } + + def loadUnpairedFastq(filePath: String, + setFirstOfPair: Boolean = false, + setSecondOfPair: Boolean = false): RDD[AlignmentRecord] = { val job = HadoopUtil.newJob(sc) val records = sc.newAPIHadoopFile( @@ -379,7 +416,7 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging { // convert records val fastqRecordConverter = new FastqRecordConverter - records.map(fastqRecordConverter.convertRead) + records.map(fastqRecordConverter.convertRead(_, setFirstOfPair, setSecondOfPair)) } def loadVcf(filePath: String, sd: Option[SequenceDictionary]): RDD[VariantContext] = { @@ -455,8 +492,8 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging { .setContig( Contig.newBuilder() .setContigName(seqRecord.name) - .setReferenceURL(seqRecord.url.getOrElse(null)) - .setContigMD5(seqRecord.md5.getOrElse(null)) + .setReferenceURL(seqRecord.url.orNull) + .setContigMD5(seqRecord.md5.orNull) .setContigLength(seqRecord.length) .build() ) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala index 5259680fbb..6423636e60 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala @@ -18,7 +18,6 @@ package org.bdgenomics.adam.rdd.read import java.io.StringWriter - import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency } import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{ FileSystem, Path }