Skip to content

Commit

Permalink
add helper for loading paired fastq files
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Sep 25, 2015
1 parent 0a0bdf1 commit 3185e4c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var forceLoadParquet: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null
}

class Transform(protected val args: TransformArgs) extends BDGSparkCommand[TransformArgs] with Logging {
Expand All @@ -110,8 +112,9 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
.fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])(
new ConsensusGeneratorFromKnowns(_, sc).asInstanceOf[ConsensusGenerator])

adamRecords = adamRecords.adamRealignIndels(consensusGenerator,
false,
adamRecords = adamRecords.adamRealignIndels(
consensusGenerator,
isSorted = false,
args.maxIndelSize,
args.maxConsensusNumber,
args.lodThreshold,
Expand Down Expand Up @@ -150,7 +153,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
sc.loadUnpairedFastq(args.inputPath)
sc.loadFastq(args.inputPath, Option(args.pairedFastqFile))
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(args.inputPath)
} else if (args.forceLoadParquet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class FastqRecordConverter extends Serializable with Logging {
.build())
}

def convertRead(element: (Void, Text)): AlignmentRecord = {
def convertRead(element: (Void, Text),
setFirstOfPair: Boolean = false,
setSecondOfPair: Boolean = false): AlignmentRecord = {
val lines = element._2.toString.split('\n')
assert(lines.length == 4, "Record has wrong format:\n" + element._2.toString)

Expand All @@ -90,10 +92,10 @@ class FastqRecordConverter extends Serializable with Logging {
.setReadName(readName)
.setSequence(readSequence)
.setQual(readQualities)
.setReadPaired(false)
.setReadPaired(setFirstOfPair || setSecondOfPair)
.setProperPair(null)
.setFirstOfPair(null)
.setSecondOfPair(null)
.setFirstOfPair(setFirstOfPair)
.setSecondOfPair(setSecondOfPair)
.setReadNegativeStrand(null)
.setMateNegativeStrand(null)
.setPrimaryAlignment(null)
Expand Down
51 changes: 44 additions & 7 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.bdgenomics.adam.rdd

import java.io.FileNotFoundException
import java.util.regex.Pattern
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.{ ValidationStringency, SAMFileHeader, IndexedBamInputFormat }
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.avro.specific.SpecificRecord
Expand Down Expand Up @@ -50,7 +50,6 @@ import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.util.ContextUtil
import scala.collection.JavaConversions._
import scala.collection.Map
import htsjdk.samtools.IndexedBamInputFormat

object ADAMContext {
// Add ADAM Spark context methods
Expand Down Expand Up @@ -364,8 +363,46 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
records.flatMap(fastqRecordConverter.convertPair)
}

def loadUnpairedFastq(
filePath: String): RDD[AlignmentRecord] = {
def loadFastq(filePath1: String,
filePath2Opt: Option[String],
stringency: ValidationStringency = ValidationStringency.STRICT): RDD[AlignmentRecord] = {
filePath2Opt match {
case Some(filePath2) => loadPairedFastq(filePath1, filePath2, stringency)
case None => loadUnpairedFastq(filePath1)
}
}

def loadPairedFastq(filePath1: String,
filePath2: String,
stringency: ValidationStringency): RDD[AlignmentRecord] = {
val reads1 = loadUnpairedFastq(filePath1, setFirstOfPair = true)
val reads2 = loadUnpairedFastq(filePath2, setSecondOfPair = true)

def zipped = reads1.zip(reads2)

stringency match {
case ValidationStringency.STRICT | ValidationStringency.LENIENT =>
val count1 = reads1.cache.count
val count2 = reads2.cache.count

if (count1 != count2) {
val msg = s"Fastq 1 ($filePath1) has $count1 reads, fastq 2 ($filePath2) has $count2 reads"
if (stringency == ValidationStringency.STRICT)
throw new IllegalArgumentException(msg)
else {
// ValidationStringency.LENIENT
logError(msg)
}
}
case ValidationStringency.SILENT =>
}

reads1 ++ reads2
}

def loadUnpairedFastq(filePath: String,
setFirstOfPair: Boolean = false,
setSecondOfPair: Boolean = false): RDD[AlignmentRecord] = {

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(
Expand All @@ -379,7 +416,7 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {

// convert records
val fastqRecordConverter = new FastqRecordConverter
records.map(fastqRecordConverter.convertRead)
records.map(fastqRecordConverter.convertRead(_, setFirstOfPair, setSecondOfPair))
}

def loadVcf(filePath: String, sd: Option[SequenceDictionary]): RDD[VariantContext] = {
Expand Down Expand Up @@ -455,8 +492,8 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
.setContig(
Contig.newBuilder()
.setContigName(seqRecord.name)
.setReferenceURL(seqRecord.url.getOrElse(null))
.setContigMD5(seqRecord.md5.getOrElse(null))
.setReferenceURL(seqRecord.url.orNull)
.setContigMD5(seqRecord.md5.orNull)
.setContigLength(seqRecord.length)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.bdgenomics.adam.rdd.read

import java.io.StringWriter

import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
Expand Down

0 comments on commit 3185e4c

Please sign in to comment.