Skip to content

Commit

Permalink
Merge 7cf9628 into adff336
Browse files Browse the repository at this point in the history
  • Loading branch information
akmorrow13 authored Jan 25, 2018
2 parents adff336 + 7cf9628 commit 52c468a
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 15 deletions.
28 changes: 28 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,34 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*/
def replaceSequences(newSequences: SequenceDictionary): U

/**
* Caches underlying RDD in memory.
*
* @return type of RDD that was cached
*/
def cache() = {
rdd.cache()
}

/**
* Persists underlying RDD in memory.
*
* @param sl new StorageLevel
* @return type of RDD that was cached
*/
def persist(sl: StorageLevel) = {
rdd.persist(sl)
}

/**
* Unpersists underlying RDD from memory.
*
* @return type of RDD that was cached
*/
def unpersist() = {
rdd.unpersist()
}

/**
* Appends sequence metadata to the current RDD.
*
Expand Down
24 changes: 24 additions & 0 deletions adam-python/bdgenomics/adam/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ def __init__(self, jvmRdd, sc):
self._jvmRdd = jvmRdd
self.sc = sc

def cache(self):
"""
Caches underlying RDD in memory.
"""

self._jvmRdd.cache()

def persist(self, sl):
"""
Persists underlying RDD in memory.
:param sl new StorageLevel
"""

self._jvmRdd.persist(self.sc._jvm.org.apache.spark.api.java.StorageLevels.create(sl.useDisk, \
sl.useMemory, sl.useOffHeap, sl.deserialized, sl.replication))

def unpersist(self):
"""
Unpersists underlying RDD from memory.
"""

self._jvmRdd.unpersist()


def sort(self):
"""
Expand Down
45 changes: 33 additions & 12 deletions adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,36 @@
from bdgenomics.adam.test import SparkTestCase

from pyspark.sql.types import DoubleType
from pyspark.storagelevel import StorageLevel

class AlignmentRecordRDDTest(SparkTestCase):


def test_save_sorted_sam(self):

testFile = self.resourceFile("sorted.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(testFile)
tmpPath = self.tmpFile() + ".sam"
sortedReads = reads.sortReadsByReferencePosition()
sortedReads.saveAsSam(tmpPath,
isSorted=True,
asSingleFile=True)

self.checkFiles(testFile, tmpPath)


def test_save_unordered_sam(self):

testFile = self.resourceFile("unordered.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(testFile)
tmpPath = self.tmpFile() + ".sam"
reads.saveAsSam(tmpPath,
asSingleFile=True)

self.checkFiles(testFile, tmpPath)


Expand All @@ -67,21 +68,21 @@ def test_union(self):
unionReads = reads1.union([reads2])

self.assertEqual(unionReads.toDF().count(), 13)


def test_save_as_bam(self):

testFile = self.resourceFile("sorted.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(testFile)
tmpPath = self.tmpFile() + ".bam"
reads.saveAsSam(tmpPath,
isSorted=True,
asSingleFile=True)

bamReads = ac.loadAlignments(tmpPath)

self.assertEquals(bamReads._jvmRdd.jrdd().count(),
reads._jvmRdd.jrdd().count())

Expand All @@ -90,7 +91,7 @@ def test_count_kmers(self):

testFile = self.resourceFile("small.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(testFile)
kmers = reads.countKmers(6)

Expand Down Expand Up @@ -168,7 +169,7 @@ def test_to_fragments(self):
def test_filterByOverlappingRegion(self):

readsPath = self.resourceFile("unsorted.sam")
ac = ADAMContext(self.sc)
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(readsPath)

Expand All @@ -180,12 +181,32 @@ def test_filterByOverlappingRegion(self):
def test_filterByOverlappingRegions(self):

readsPath = self.resourceFile("unsorted.sam")
ac = ADAMContext(self.sc)
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(readsPath)

querys = [ReferenceRegion("1", 20000000L, 27000000L),
ReferenceRegion("1", 230000000L,270000000L)]
ReferenceRegion("1", 230000000L,270000000L)]

filtered = reads.filterByOverlappingRegion(querys)
self.assertEquals(filtered.toDF().count(), 6)

def test_caching(self):

readsPath = self.resourceFile("unsorted.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(readsPath)

reads.cache()
reads.unpersist()

def test_persisting(self):

readsPath = self.resourceFile("unsorted.sam")
ac = ADAMContext(self.ss)

reads = ac.loadAlignments(readsPath)

reads.persist(StorageLevel.DISK_ONLY)
reads.unpersist()
43 changes: 43 additions & 0 deletions adam-r/bdgenomics.adam/R/rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,49 @@ setMethod("pipe",
replaceRdd(ardd, rdd)
})


#' Caches the existing ardd
#'
#' @param ardd The RDD to apply this to.
#'
#' @importFrom SparkR sparkR.callJMethod
#'
#' @export
setMethod("cache",
signature(ardd = "GenomicRDD"),
function(ardd) {
replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "cache"))
})


#' Persists the existing ardd
#'
#' @param ardd The RDD to apply this to.
#' @param sl the StorageLevel to persist in.
#'
#' @importFrom SparkR sparkR.callJMethod
#'
#' @export
setMethod("persist",
signature(ardd = "GenomicRDD",
sl = "StorageLevel"),
function(ardd) {
replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "persist", sl))
})

#' Unpersists the existing ardd
#'
#' @param ardd The RDD to apply this to.
#'
#' @importFrom SparkR sparkR.callJMethod
#'
#' @export
setMethod("unpersist",
signature(ardd = "GenomicRDD"),
function(ardd) {
replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "unpersist"))
})

#' Sorts our genome aligned data by reference positions, with contigs ordered
#' by index.
#'
Expand Down
4 changes: 1 addition & 3 deletions scripts/jenkins-test
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ set -x -v
# if those pass, build the distribution package
mvn -U \
-P distribution \
package \
-DskipTests \
test \
-Dhadoop.version=${HADOOP_VERSION} \
-Dspark.version=${SPARK_VERSION} \
-DargLine=${ADAM_MVN_TMP_DIR}
Expand Down Expand Up @@ -216,7 +215,6 @@ then
mvn -U \
-P python,r \
package \
-DskipTests \
-Dhadoop.version=${HADOOP_VERSION} \
-Dspark.version=${SPARK_VERSION}
fi
Expand Down

0 comments on commit 52c468a

Please sign in to comment.