From 2e77fb11e9146fc5e883985b2c84bba3da5e8108 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Tue, 23 Jan 2018 21:46:37 -0800 Subject: [PATCH 1/6] added caching functions for python --- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 20 +++++++++++ .../adam/test/alignmentRecordRdd_test.py | 34 ++++++++++++------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 6f34541458..0c1c1fb054 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -137,6 +137,26 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { */ def replaceSequences(newSequences: SequenceDictionary): U + /** + * Caches underlying RDD in memory. This interfaced is used to + * access caching functionality from the python and R APIs. + * + * @return type of RDD that was cached + */ + def cache() = { + rdd.cache() + } + + /** + * Unpersists underlying RDD from memory. This interfaced is used to + * access caching functionality from the python and R APIs. + * + * @return type of RDD that was cached + */ + def unpersist() = { + rdd.unpersist() + } + /** * Appends sequence metadata to the current RDD. * diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index f46b3fcef9..5c0cd0dca8 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -26,19 +26,19 @@ class AlignmentRecordRDDTest(SparkTestCase): - + def test_save_sorted_sam(self): testFile = self.resourceFile("sorted.sam") ac = ADAMContext(self.ss) - + reads = ac.loadAlignments(testFile) tmpPath = self.tmpFile() + ".sam" sortedReads = reads.sortReadsByReferencePosition() sortedReads.saveAsSam(tmpPath, isSorted=True, asSingleFile=True) - + self.checkFiles(testFile, tmpPath) @@ -46,12 +46,12 @@ def test_save_unordered_sam(self): testFile = self.resourceFile("unordered.sam") ac = ADAMContext(self.ss) - + reads = ac.loadAlignments(testFile) tmpPath = self.tmpFile() + ".sam" reads.saveAsSam(tmpPath, asSingleFile=True) - + self.checkFiles(testFile, tmpPath) @@ -67,13 +67,13 @@ def test_union(self): unionReads = reads1.union([reads2]) self.assertEqual(unionReads.toDF().count(), 13) - + def test_save_as_bam(self): testFile = self.resourceFile("sorted.sam") ac = ADAMContext(self.ss) - + reads = ac.loadAlignments(testFile) tmpPath = self.tmpFile() + ".bam" reads.saveAsSam(tmpPath, @@ -81,7 +81,7 @@ def test_save_as_bam(self): asSingleFile=True) bamReads = ac.loadAlignments(tmpPath) - + self.assertEquals(bamReads._jvmRdd.jrdd().count(), reads._jvmRdd.jrdd().count()) @@ -90,7 +90,7 @@ def test_count_kmers(self): testFile = self.resourceFile("small.sam") ac = ADAMContext(self.ss) - + reads = ac.loadAlignments(testFile) kmers = reads.countKmers(6) @@ -168,7 +168,7 @@ def test_to_fragments(self): def test_filterByOverlappingRegion(self): readsPath = self.resourceFile("unsorted.sam") - ac = ADAMContext(self.sc) + ac = ADAMContext(self.ss) reads = ac.loadAlignments(readsPath) @@ -180,12 +180,22 @@ def test_filterByOverlappingRegion(self): def test_filterByOverlappingRegions(self): readsPath = self.resourceFile("unsorted.sam") - ac = ADAMContext(self.sc) + ac = ADAMContext(self.ss) reads = ac.loadAlignments(readsPath) querys = [ReferenceRegion("1", 20000000L, 27000000L), - ReferenceRegion("1", 230000000L,270000000L)] + ReferenceRegion("1", 230000000L,270000000L)] filtered = reads.filterByOverlappingRegion(querys) self.assertEquals(filtered.toDF().count(), 6) + + def test_caching(self): + + readsPath = self.resourceFile("unsorted.sam") + ac = ADAMContext(self.ss) + + reads = ac.loadAlignments(readsPath) + + reads._jvmRdd.cache() + reads._jvmRdd.unpersist() From e08a4dbe8be7d02740fa0d642eeff81f292e947d Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 25 Jan 2018 11:01:36 -0800 Subject: [PATCH 2/6] add persist function --- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 16 ++++++++++++---- adam-python/bdgenomics/adam/rdd.py | 12 ++++++++++++ .../adam/test/alignmentRecordRdd_test.py | 15 +++++++++++++-- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 0c1c1fb054..d1d9499a11 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -138,8 +138,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { def replaceSequences(newSequences: SequenceDictionary): U /** - * Caches underlying RDD in memory. This interfaced is used to - * access caching functionality from the python and R APIs. + * Caches underlying RDD in memory. * * @return type of RDD that was cached */ @@ -148,8 +147,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { } /** - * Unpersists underlying RDD from memory. This interfaced is used to - * access caching functionality from the python and R APIs. + * Persists underlying RDD in memory. + * + * @param sl new StorageLevel + * @return type of RDD that was cached + */ + def persist(sl: StorageLevel) = { + rdd.persist(sl) + } + + /** + * Unpersists underlying RDD from memory. * * @return type of RDD that was cached */ diff --git a/adam-python/bdgenomics/adam/rdd.py b/adam-python/bdgenomics/adam/rdd.py index 2ece570655..e71c446477 100644 --- a/adam-python/bdgenomics/adam/rdd.py +++ b/adam-python/bdgenomics/adam/rdd.py @@ -42,6 +42,18 @@ def __init__(self, jvmRdd, sc): self._jvmRdd = jvmRdd self.sc = sc + def cache(self): + self._jvmRdd.cache() + + def persist(self, sl): + sl_dict = sl.__dict__ + + self._jvmRdd.persist(self.sc._jvm.org.apache.spark.storage.StorageLevel.apply(sl_dict["useDisk"], sl_dict["useMemory"], + sl_dict["deserialized"], sl_dict["replication"])) + + def unpersist(self): + self._jvmRdd.unpersist() + def sort(self): """ diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index 5c0cd0dca8..4cf975cda2 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -23,6 +23,7 @@ from bdgenomics.adam.test import SparkTestCase from pyspark.sql.types import DoubleType +from pyspark.storagelevel import StorageLevel class AlignmentRecordRDDTest(SparkTestCase): @@ -197,5 +198,15 @@ def test_caching(self): reads = ac.loadAlignments(readsPath) - reads._jvmRdd.cache() - reads._jvmRdd.unpersist() + reads.cache() + reads.unpersist() + + def test_persisting(self): + + readsPath = self.resourceFile("unsorted.sam") + ac = ADAMContext(self.ss) + + reads = ac.loadAlignments(readsPath) + + reads.persist(StorageLevel.DISK_ONLY) + reads.unpersist() From 632e4fd842e2671fc7516089948bb02271b47df0 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 25 Jan 2018 11:50:18 -0800 Subject: [PATCH 3/6] is jenkins working --- .../bdgenomics/adam/test/alignmentRecordRdd_test.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index 4cf975cda2..68a50a947b 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -210,3 +210,12 @@ def test_persisting(self): reads.persist(StorageLevel.DISK_ONLY) reads.unpersist() + + def test_jenkins(self): + + readsPath = self.resourceFile("this_is_garbage") + ac = ADAMContext(self.ss) + + reads = ac.loadAlignments(readsPath) + + self.assertEquals(reads.toDF().count(), 20) \ No newline at end of file From ea13af829ba481d02f21a7c873e216ac10603ba7 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 25 Jan 2018 12:02:16 -0800 Subject: [PATCH 4/6] execute jenkins test --- .../bdgenomics/adam/test/alignmentRecordRdd_test.py | 9 --------- scripts/jenkins-test | 1 - 2 files changed, 10 deletions(-) diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index 68a50a947b..4cf975cda2 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -210,12 +210,3 @@ def test_persisting(self): reads.persist(StorageLevel.DISK_ONLY) reads.unpersist() - - def test_jenkins(self): - - readsPath = self.resourceFile("this_is_garbage") - ac = ADAMContext(self.ss) - - reads = ac.loadAlignments(readsPath) - - self.assertEquals(reads.toDF().count(), 20) \ No newline at end of file diff --git a/scripts/jenkins-test b/scripts/jenkins-test index 024b1586df..dc4be5545c 100755 --- a/scripts/jenkins-test +++ b/scripts/jenkins-test @@ -216,7 +216,6 @@ then mvn -U \ -P python,r \ package \ - -DskipTests \ -Dhadoop.version=${HADOOP_VERSION} \ -Dspark.version=${SPARK_VERSION} fi From 7cf9628e738811e61948484b9d5c89ea972bbaac Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 25 Jan 2018 14:42:16 -0800 Subject: [PATCH 5/6] added R stuff? --- adam-python/bdgenomics/adam/rdd.py | 18 ++++++++++--- adam-r/bdgenomics.adam/R/rdd.R | 43 ++++++++++++++++++++++++++++++ scripts/jenkins-test | 3 +-- 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/adam-python/bdgenomics/adam/rdd.py b/adam-python/bdgenomics/adam/rdd.py index e71c446477..f969b365ba 100644 --- a/adam-python/bdgenomics/adam/rdd.py +++ b/adam-python/bdgenomics/adam/rdd.py @@ -43,15 +43,27 @@ def __init__(self, jvmRdd, sc): self.sc = sc def cache(self): + """ + Caches underlying RDD in memory. + """ + self._jvmRdd.cache() def persist(self, sl): - sl_dict = sl.__dict__ + """ + Persists underlying RDD in memory. - self._jvmRdd.persist(self.sc._jvm.org.apache.spark.storage.StorageLevel.apply(sl_dict["useDisk"], sl_dict["useMemory"], - sl_dict["deserialized"], sl_dict["replication"])) + :param sl new StorageLevel + """ + + self._jvmRdd.persist(self.sc._jvm.org.apache.spark.api.java.StorageLevels.create(sl.useDisk, \ + sl.useMemory, sl.useOffHeap, sl.deserialized, sl.replication)) def unpersist(self): + """ + Unpersists underlying RDD from memory. + """ + self._jvmRdd.unpersist() diff --git a/adam-r/bdgenomics.adam/R/rdd.R b/adam-r/bdgenomics.adam/R/rdd.R index 83b09af51e..a4123e659c 100644 --- a/adam-r/bdgenomics.adam/R/rdd.R +++ b/adam-r/bdgenomics.adam/R/rdd.R @@ -240,6 +240,49 @@ setMethod("pipe", replaceRdd(ardd, rdd) }) + +#' Caches the existing ardd +#' +#' @param ardd The RDD to apply this to. +#' +#' @importFrom SparkR sparkR.callJMethod +#' +#' @export +setMethod("cache", + signature(ardd = "GenomicRDD"), + function(ardd) { + replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "cache")) + }) + + +#' Persists the existing ardd +#' +#' @param ardd The RDD to apply this to. +#' @param sl the StorageLevel to persist in. +#' +#' @importFrom SparkR sparkR.callJMethod +#' +#' @export +setMethod("persist", + signature(ardd = "GenomicRDD", + sl = "StorageLevel"), + function(ardd) { + replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "persist", sl)) + }) + +#' Unpersists the existing ardd +#' +#' @param ardd The RDD to apply this to. +#' +#' @importFrom SparkR sparkR.callJMethod +#' +#' @export +setMethod("unpersist", + signature(ardd = "GenomicRDD"), + function(ardd) { + replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "unpersist")) + }) + #' Sorts our genome aligned data by reference positions, with contigs ordered #' by index. #' diff --git a/scripts/jenkins-test b/scripts/jenkins-test index dc4be5545c..23568b2b38 100755 --- a/scripts/jenkins-test +++ b/scripts/jenkins-test @@ -100,8 +100,7 @@ set -x -v # if those pass, build the distribution package mvn -U \ -P distribution \ - package \ - -DskipTests \ + test \ -Dhadoop.version=${HADOOP_VERSION} \ -Dspark.version=${SPARK_VERSION} \ -DargLine=${ADAM_MVN_TMP_DIR} From cc935053bce9de68dc382ccf4bc307c0160dbd5e Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 25 Jan 2018 15:17:54 -0800 Subject: [PATCH 6/6] test code for R --- adam-r/bdgenomics.adam/R/rdd.R | 8 ++++---- .../tests/testthat/test_alignmentRecordRdd.R | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/adam-r/bdgenomics.adam/R/rdd.R b/adam-r/bdgenomics.adam/R/rdd.R index a4123e659c..ad6febb2f1 100644 --- a/adam-r/bdgenomics.adam/R/rdd.R +++ b/adam-r/bdgenomics.adam/R/rdd.R @@ -254,19 +254,19 @@ setMethod("cache", replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "cache")) }) - #' Persists the existing ardd #' #' @param ardd The RDD to apply this to. #' @param sl the StorageLevel to persist in. #' -#' @importFrom SparkR sparkR.callJMethod +#' @importFrom SparkR sparkR.callJMethod sparkR.callJStatic #' #' @export setMethod("persist", signature(ardd = "GenomicRDD", - sl = "StorageLevel"), - function(ardd) { + sl = "character"), + function(ardd, sl) { + storageLevel <- sparkR.callJStatic("org.apache.spark.storage.StorageLevel", "fromString", sl) replaceRdd(ardd, sparkR.callJMethod(ardd@jrdd, "persist", sl)) }) diff --git a/adam-r/bdgenomics.adam/tests/testthat/test_alignmentRecordRdd.R b/adam-r/bdgenomics.adam/tests/testthat/test_alignmentRecordRdd.R index ae91240244..c7ad47008f 100644 --- a/adam-r/bdgenomics.adam/tests/testthat/test_alignmentRecordRdd.R +++ b/adam-r/bdgenomics.adam/tests/testthat/test_alignmentRecordRdd.R @@ -105,3 +105,23 @@ test_that("transmute to coverage", { expect_true(is(readsAsCoverage, "CoverageRDD")) expect_equal(count(toDF(readsAsCoverage)), 5) }) + +test_that("reads can cache", { + + readsPath <- resourceFile("unsorted.sam") + reads <- loadAlignments(ac, readsPath) + + cache(reads) + unpersist(reads) +}) + +test_that("reads can persist with storage level", { + + readsPath <- resourceFile("unsorted.sam") + reads <- loadAlignments(ac, readsPath) + storageLevel <- "MEMORY_AND_DISK" + + persist(reads, storageLevel) + unpersist(reads) +}) +