Skip to content

Commit

Permalink
Add tests for MergeShards CLI.
Browse files Browse the repository at this point in the history
* For using with CRAM, artificial.fa relies on having it's FASTA index at a
  known location. This is hard to do with the `copyResource` function, so I just
  added a symlink.
* Moved `org.bdgenomics.adam.rdd.FileMergerSuite` to it's correct location in
  the source tree.
  • Loading branch information
fnothaft committed Sep 15, 2016
1 parent 2ef1347 commit ca4d964
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 25 deletions.
15 changes: 11 additions & 4 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/MergeShards.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ class MergeShardsArgs extends Args4jBase {
var inputPath: String = null
@Argument(required = true, metaVar = "OUTPUT", usage = "The location to write the merged file", index = 1)
var outputPath: String = null
@Args4jOption(required = false, name = "-headerPath", usage = "Optional path to a header")
@Args4jOption(required = false, name = "-header_path", usage = "Optional path to a header")
var headerPath: String = null
@Args4jOption(required = false,
name = "-bufferSize",
usage = "Buffer size for merging single file output. If provided, overrides configured buffer size.")
name = "-buffer_size",
usage = "Buffer size for merging single file output. If provided, overrides configured buffer size (default of 4MB).")
var bufferSize: Int = _
@Args4jOption(required = false, name = "-writeEmptyGZIPAtEof", usage = "If provided, writes an empty GZIP block at EOF")
@Args4jOption(required = false,
name = "-write_empty_GZIP_at_eof",
usage = "If provided, writes an empty GZIP block at EOF")
var gzipAtEof: Boolean = false
@Args4jOption(required = false,
name = "-write_cram_eof",
usage = "If provided, writes the CRAM EOF signifier")
var cramEof: Boolean = false
}

object MergeShards extends BDGCommandCompanion {
Expand Down Expand Up @@ -77,6 +83,7 @@ class MergeShards(val args: MergeShardsArgs) extends BDGSparkCommand[MergeShards
fsIn, fsOut,
outputPath, tailPath, optHeadPath,
writeEmptyGzipBlock = args.gzipAtEof,
writeCramEOF = args.cramEof,
optBufferSize = Option(args.bufferSize).filter(_ > 0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
mergedSd
}

outputRdd.save(args)
outputRdd.save(args,
isSorted = (args.sortReads || args.sortLexicographically))
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time {
Expand Down
1 change: 1 addition & 0 deletions adam-cli/src/test/resources/artificial.fa
1 change: 1 addition & 0 deletions adam-cli/src/test/resources/artificial.fa.fai
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.seqdoop.hadoop_bam.CRAMInputFormat
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite

class MergeShardsSuite extends ADAMFunSuite {
sparkTest("merge shards from unordered sam to unordered sam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
Transform(Array("-single", "-defer_merging", inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam to ordered sam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
Transform(Array("-single",
"-sort_reads",
"-sort_lexicographically",
"-defer_merging",
inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("merge sharded bam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.bam")
Transform(Array("-single",
"-defer_merging",
inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head",
"-write_empty_GZIP_at_eof")).run(sc)
assert(sc.loadAlignments(actualPath).rdd.count === 8L)
}

sparkTest("merge sharded cram") {
val inputPath = copyResource("artificial.cram")
val referencePath = resourceUrl("artificial.fa").toString
sc.hadoopConfiguration.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY,
referencePath)
println(referencePath)

val actualPath = tmpFile("artificial.cram")
Transform(Array("-single",
"-sort_reads",
"-sort_lexicographically",
"-defer_merging",
inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head",
"-write_cram_eof")).run(sc)
assert(sc.loadAlignments(actualPath).rdd.count === 10L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object FileMerger extends Logging {
* @param optBufferSize The size in bytes of the buffer used for copying. If
* not set, we check the config for this value. If that is not set, we
* default to 4MB.
*
*
* @see mergeFilesAcrossFilesystems
*/
private[adam] def mergeFiles(conf: Configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,7 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
saveAsSam(
args.outputPath,
isSorted = isSorted,
asSingleFile = args.asSingleFile deferMerging = args.deferMerging
)
true
} else if (args.outputPath.endsWith(".bam")) {
log.info("Saving data in BAM format")
saveAsSam(
args.outputPath,
asSam = false,
asSingleFile = args.asSingleFile,
isSorted = isSorted,
deferMerging = args.deferMerging
)
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class FileMergerSuite extends ADAMFunSuite {
sparkTest("cannot write both empty gzip block and cram eof") {
intercept[IllegalArgumentException] {
// we don't need to pass real paths here
FileMerger.mergeFiles(FileSystem.getLocal(sc.hadoopConfiguration),
FileMerger.mergeFiles(sc.hadoopConfiguration,
FileSystem.getLocal(sc.hadoopConfiguration),
new Path("output"),
new Path("head"),
writeEmptyGzipBlock = true,
Expand All @@ -36,10 +37,11 @@ class FileMergerSuite extends ADAMFunSuite {
sparkTest("buffer size must be non-negative") {
intercept[IllegalArgumentException] {
// we don't need to pass real paths here
FileMerger.mergeFiles(FileSystem.getLocal(sc.hadoopConfiguration),
FileMerger.mergeFiles(sc.hadoopConfiguration,
FileSystem.getLocal(sc.hadoopConfiguration),
new Path("output"),
new Path("head"),
bufferSize = 0)
optBufferSize = Some(0))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
*/
package org.bdgenomics.adam.util

import java.io.File
import java.nio.file.Files

import com.google.common.io.Resources

import java.io.File
import java.net.URI
import java.nio.file.{ Files, Path }
import org.bdgenomics.utils.misc.SparkFunSuite

import scala.io.Source

trait ADAMFunSuite extends SparkFunSuite {
Expand Down Expand Up @@ -73,10 +71,17 @@ trait ADAMFunSuite extends SparkFunSuite {
}
}

def copyResource(name: String): String = {
def copyResourcePath(name: String): Path = {
val tempFile = Files.createTempFile(name, "." + name.split('.').tail.mkString("."))
Files.write(tempFile, Resources.toByteArray(getClass().getResource("/" + name)))
tempFile.toString
}

def copyResource(name: String): String = {
copyResourcePath(name).toString
}

def copyResourceUri(name: String): URI = {
copyResourcePath(name).toUri
}
}

0 comments on commit ca4d964

Please sign in to comment.