From 629c3d212488288563ab54c76b6461a651679352 Mon Sep 17 00:00:00 2001 From: tpoterba Date: Wed, 25 Nov 2015 13:26:30 -0500 Subject: [PATCH 1/4] Moved SparkyVSM to VSM, removed abstract class and all vsmtype arguments --- .../broadinstitute/hail/methods/LoadVCF.scala | 3 +- .../hail/variant/VariantSampleMatrix.scala | 204 ++++++++++++++---- .../broadinstitute/hail/variant/package.scala | 2 +- .../hail/variant/vsm/SparkyVSM.scala | 184 ---------------- 4 files changed, 166 insertions(+), 227 deletions(-) delete mode 100644 src/main/scala/org/broadinstitute/hail/variant/vsm/SparkyVSM.scala diff --git a/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala b/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala index 32b48e4c593..d0d761b9583 100644 --- a/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala +++ b/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala @@ -10,7 +10,6 @@ object LoadVCF { // FIXME move to VariantDataset def apply(sc: SparkContext, file: String, - vsmtype: String = "sparky", compress: Boolean = true, nPartitions: Option[Int] = None): VariantDataset = { @@ -48,6 +47,6 @@ object LoadVCF { } // FIXME null should be contig lengths - VariantSampleMatrix(vsmtype, VariantMetadata(null, sampleIds, headerLines), genotypes) + VariantSampleMatrix(VariantMetadata(null, sampleIds, headerLines), genotypes) } } diff --git a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala index d3ca10812b1..586cebf95ea 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala @@ -1,94 +1,218 @@ package org.broadinstitute.hail.variant -import org.apache.spark.SparkContext +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.broadinstitute.hail.Utils._ -import org.broadinstitute.hail.variant.vsm.SparkyVSM import scala.reflect.ClassTag import scala.reflect.runtime.universe._ + object VariantSampleMatrix { - def apply(vsmtype: String, - metadata: VariantMetadata, - rdd: RDD[(Variant, GenotypeStream)]): VariantSampleMatrix[Genotype] = { - vsmtype match { - case "sparky" => new SparkyVSM(metadata, rdd) - } + + def apply(metadata: VariantMetadata, + rdd: RDD[(Variant, GenotypeStream)]): VariantDataset = { + new VariantSampleMatrix(metadata, rdd) } - def read(sqlContext: SQLContext, dirname: String) = { - val (vsmType, metadata) = readObjectFile(dirname + "/metadata.ser", sqlContext.sparkContext.hadoopConfiguration)( - _.readObject().asInstanceOf[(String, VariantMetadata)]) + def read(sqlContext: SQLContext, dirname: String, metadata: VariantMetadata): VariantDataset = { + import RichRow._ - vsmType match { - case "sparky" => SparkyVSM.read(sqlContext, dirname, metadata) - } + require(dirname.endsWith(".vds")) + + // val df = sqlContext.read.parquet(dirname + "/rdd.parquet") + val df = sqlContext.parquetFile(dirname + "/rdd.parquet") + new VariantSampleMatrix[Genotype, GenotypeStream](metadata, df.rdd.map(r => (r.getVariant(0), r.getGenotypeStream(1)))) } } -// FIXME all maps should become RDDs -abstract class VariantSampleMatrix[T](val metadata: VariantMetadata, - val localSamples: Array[Int]) { + +class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, + localSamples: Array[Int], + val rdd: RDD[(Variant, S)]) + (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S], + vct: ClassTag[Variant]) { + + def this(metadata: VariantMetadata, rdd: RDD[(Variant, S)]) + (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S]) = + this(metadata, Array.range(0, metadata.nSamples), rdd) def sampleIds: Array[String] = metadata.sampleIds def nSamples: Int = metadata.sampleIds.length def nLocalSamples: Int = localSamples.length - def sparkContext: SparkContext + def copy[U, V <: Iterable[U]](metadata: VariantMetadata = this.metadata, + localSamples: Array[Int] = this.localSamples, + rdd: RDD[(Variant, V)] = this.rdd) + (implicit ttt: TypeTag[U], stt: TypeTag[V], tct: ClassTag[U], sct: ClassTag[V]): VariantSampleMatrix[U, V] = + new VariantSampleMatrix(metadata, localSamples, rdd) + + def sparkContext: SparkContext = rdd.sparkContext + + def cache(): VariantSampleMatrix[T, S] = copy(rdd = rdd.cache()) + + def repartition(nPartitions: Int) = copy(rdd = rdd.repartition(nPartitions)) - // underlying RDD - def nPartitions: Int - def cache(): VariantSampleMatrix[T] - def repartition(nPartitions: Int): VariantSampleMatrix[T] + def nPartitions: Int = rdd.partitions.length - def variants: RDD[Variant] + def variants: RDD[Variant] = rdd.keys def nVariants: Long = variants.count() - def expand(): RDD[(Variant, Int, T)] + def expand(): RDD[(Variant, Int, T)] = + mapWithKeys[(Variant, Int, T)]((v, s, g) => (v, s, g)) - def write(sqlContext: SQLContext, dirname: String) + def write(sqlContext: SQLContext, dirname: String) { + import sqlContext.implicits._ - def mapValuesWithKeys[U](f: (Variant, Int, T) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U] + require(dirname.endsWith(".vds")) - def mapValues[U](f: (T) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U] = { + val hConf = sparkContext.hadoopConfiguration + hadoopMkdir(dirname, hConf) + writeObjectFile(dirname + "/metadata.ser", hConf)( + _.writeObject("sparky" -> metadata)) + + // rdd.toDF().write.parquet(dirname + "/rdd.parquet") + rdd.toDF().saveAsParquetFile(dirname + "/rdd.parquet") + } + + def mapValues[U](f: (T) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U, Iterable[U]] = { mapValuesWithKeys((v, s, g) => f(g)) } - def mapWithKeys[U](f: (Variant, Int, T) => U)(implicit uct: ClassTag[U]): RDD[U] + def mapValuesWithKeys[U](f: (Variant, Int, T) => U) + (implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U, Iterable[U]] = { + val localSamplesBc = sparkContext.broadcast(localSamples) + copy(rdd = rdd.map { case (v, gs) => + (v, localSamplesBc.value.view.zip(gs.view) + .map { case (s, t) => f(v, s, t) }) + }) + } + def map[U](f: T => U)(implicit uct: ClassTag[U]): RDD[U] = mapWithKeys((v, s, g) => f(g)) - def flatMapWithKeys[U](f: (Variant, Int, T) => TraversableOnce[U])(implicit uct: ClassTag[U]): RDD[U] + def mapWithKeys[U](f: (Variant, Int, T) => U)(implicit uct: ClassTag[U]): RDD[U] = { + val localSamplesBc = sparkContext.broadcast(localSamples) + rdd + .flatMap { case (v, gs) => localSamplesBc.value.view.zip(gs.view) + .map { case (s, g) => f(v, s, g) } + } + } + def flatMap[U](f: T => TraversableOnce[U])(implicit uct: ClassTag[U]): RDD[U] = flatMapWithKeys((v, s, g) => f(g)) - def filterVariants(p: (Variant) => Boolean): VariantSampleMatrix[T] - def filterVariants(ilist: IntervalList): VariantSampleMatrix[T] = - filterVariants(v => ilist.contains(v.contig, v.start)) + def flatMapWithKeys[U](f: (Variant, Int, T) => TraversableOnce[U])(implicit uct: ClassTag[U]): RDD[U] = { + val localSamplesBc = sparkContext.broadcast(localSamples) + rdd + .flatMap { case (v, gs) => localSamplesBc.value.view.zip(gs.view) + .flatMap { case (s, g) => f(v, s, g) } + } + } - def filterSamples(p: (Int) => Boolean): VariantSampleMatrix[T] + def filterVariants(ilist: IntervalList): VariantSampleMatrix[T, S] = + filterVariants(v => ilist.contains(v.contig, v.start)) - def aggregateBySampleWithKeys[U](zeroValue: U)( - seqOp: (U, Variant, Int, T) => U, - combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Int, U)] + def filterVariants(p: (Variant) => Boolean): VariantSampleMatrix[T, S] = + copy(rdd = rdd.filter { case (v, _) => p(v) }) + + def filterSamples(p: (Int) => Boolean) = { + val localSamplesBc = sparkContext.broadcast(localSamples) + copy(localSamples = localSamples.filter(p), + rdd = rdd.map { case (v, gs) => + (v, localSamplesBc.value.view.zip(gs.view) + .filter { case (s, _) => p(s) } + .map(_._2)) + }) + } def aggregateBySample[U](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Int, U)] = aggregateBySampleWithKeys(zeroValue)((e, v, s, g) => seqOp(e, g), combOp) - def aggregateByVariantWithKeys[U](zeroValue: U)( + def aggregateBySampleWithKeys[U](zeroValue: U)( seqOp: (U, Variant, Int, T) => U, - combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Variant, U)] + combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Int, U)] = { + + val localSamplesBc = sparkContext.broadcast(localSamples) + + val serializer = SparkEnv.get.serializer.newInstance() + val zeroBuffer = serializer.serialize(zeroValue) + val zeroArray = new Array[Byte](zeroBuffer.limit) + zeroBuffer.get(zeroArray) + + rdd + .mapPartitions { (it: Iterator[(Variant, S)]) => + val serializer = SparkEnv.get.serializer.newInstance() + def copyZeroValue() = serializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + val arrayZeroValue = Array.fill[U](localSamplesBc.value.length)(copyZeroValue()) + + localSamplesBc.value.iterator + .zip(it.foldLeft(arrayZeroValue) { case (acc, (v, gs)) => + for ((g, i) <- gs.zipWithIndex) + acc(i) = seqOp(acc(i), v, localSamplesBc.value(i), g) + acc + }.iterator) + }.foldByKey(zeroValue)(combOp) + } def aggregateByVariant[U](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Variant, U)] = aggregateByVariantWithKeys(zeroValue)((e, v, s, g) => seqOp(e, g), combOp) - def foldBySample(zeroValue: T)(combOp: (T, T) => T): RDD[(Int, T)] + def aggregateByVariantWithKeys[U](zeroValue: U)( + seqOp: (U, Variant, Int, T) => U, + combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Variant, U)] = { - def foldByVariant(zeroValue: T)(combOp: (T, T) => T): RDD[(Variant, T)] + val localSamplesBc = sparkContext.broadcast(localSamples) + + // Serialize the zero value to a byte array so that we can get a new clone of it on each key + val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) + val zeroArray = new Array[Byte](zeroBuffer.limit) + zeroBuffer.get(zeroArray) + + rdd + .map { case (v, gs) => + val serializer = SparkEnv.get.serializer.newInstance() + val zeroValue = serializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + + (v, gs.zipWithIndex.foldLeft(zeroValue) { case (acc, (g, i)) => + seqOp(acc, v, localSamplesBc.value(i), g) + }) + } + } + + def foldBySample(zeroValue: T)(combOp: (T, T) => T): RDD[(Int, T)] = { + + val localSamplesBc = sparkContext.broadcast(localSamples) + val localtct = tct + + val serializer = SparkEnv.get.serializer.newInstance() + val zeroBuffer = serializer.serialize(zeroValue) + val zeroArray = new Array[Byte](zeroBuffer.limit) + zeroBuffer.get(zeroArray) + + rdd + .mapPartitions { (it: Iterator[(Variant, S)]) => + val serializer = SparkEnv.get.serializer.newInstance() + def copyZeroValue() = serializer.deserialize[T](ByteBuffer.wrap(zeroArray))(localtct) + val arrayZeroValue = Array.fill[T](localSamplesBc.value.size)(copyZeroValue()) + localSamplesBc.value.iterator + .zip(it.foldLeft(arrayZeroValue) { case (acc, (v, gs)) => + for ((g, i) <- gs.zipWithIndex) + acc(i) = combOp(acc(i), g) + acc + }.iterator) + }.foldByKey(zeroValue)(combOp) + } + + def foldByVariant(zeroValue: T)(combOp: (T, T) => T): RDD[(Variant, T)] = { + rdd + .mapValues(_.foldLeft(zeroValue)((acc, g) => combOp(acc, g))) + } } diff --git a/src/main/scala/org/broadinstitute/hail/variant/package.scala b/src/main/scala/org/broadinstitute/hail/variant/package.scala index d0cab37c6cc..0b752308307 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/package.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/package.scala @@ -4,7 +4,7 @@ import org.apache.spark.rdd.RDD import org.broadinstitute.hail.variant.{GenotypeStream, Variant} package object variant { - type VariantDataset = VariantSampleMatrix[Genotype] + type VariantDataset = VariantSampleMatrix[Genotype, Iterable[Genotype]] // type VariantSampleMatrix[T, S] = managed.ManagedVSM[T, S] // type VariantSampleMatrix[T, S <: Iterable[(Int, T)]] = sparky.SparkyVSM[T, S] diff --git a/src/main/scala/org/broadinstitute/hail/variant/vsm/SparkyVSM.scala b/src/main/scala/org/broadinstitute/hail/variant/vsm/SparkyVSM.scala deleted file mode 100644 index ac0a30e18c1..00000000000 --- a/src/main/scala/org/broadinstitute/hail/variant/vsm/SparkyVSM.scala +++ /dev/null @@ -1,184 +0,0 @@ -package org.broadinstitute.hail.variant.vsm - -import java.nio.ByteBuffer -import org.apache.spark.{SparkEnv, SparkContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext -import org.broadinstitute.hail.Utils._ -import org.broadinstitute.hail.variant._ -import scala.collection.mutable -import scala.language.implicitConversions -import scala.reflect.ClassTag -import scala.reflect.runtime.universe._ - -object SparkyVSM { - def read(sqlContext: SQLContext, dirname: String, metadata: VariantMetadata): SparkyVSM[Genotype, GenotypeStream] = { - import RichRow._ - - require(dirname.endsWith(".vds")) - - // val df = sqlContext.read.parquet(dirname + "/rdd.parquet") - val df = sqlContext.parquetFile(dirname + "/rdd.parquet") - new SparkyVSM[Genotype, GenotypeStream](metadata, df.rdd.map(r => (r.getVariant(0), r.getGenotypeStream(1)))) - } -} - -class SparkyVSM[T, S <: Iterable[T]](metadata: VariantMetadata, - localSamples: Array[Int], - val rdd: RDD[(Variant, S)]) - (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S], - vct: ClassTag[Variant]) - extends VariantSampleMatrix[T](metadata, localSamples) { - - def this(metadata: VariantMetadata, rdd: RDD[(Variant, S)]) - (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S]) = - this(metadata, Array.range(0, metadata.nSamples), rdd) - - def copy[U, V <: Iterable[U]](metadata: VariantMetadata = this.metadata, - localSamples: Array[Int] = this.localSamples, - rdd: RDD[(Variant, V)] = this.rdd) - (implicit ttt: TypeTag[U], stt: TypeTag[V], tct: ClassTag[U], sct: ClassTag[V]): SparkyVSM[U, V] = - new SparkyVSM[U, V](metadata, localSamples, rdd) - - def sparkContext: SparkContext = rdd.sparkContext - - def cache() = copy(rdd = rdd.cache()) - - def repartition(nPartitions: Int) = copy(rdd = rdd.repartition(nPartitions)) - - def nPartitions: Int = rdd.partitions.length - - def variants: RDD[Variant] = rdd.keys - - def expand(): RDD[(Variant, Int, T)] = - mapWithKeys[(Variant, Int, T)]((v, s, g) => (v, s, g)) - - def write(sqlContext: SQLContext, dirname: String) { - import sqlContext.implicits._ - - require(dirname.endsWith(".vds")) - - val hConf = sparkContext.hadoopConfiguration - hadoopMkdir(dirname, hConf) - writeObjectFile(dirname + "/metadata.ser", hConf)( - _.writeObject("sparky" -> metadata)) - - // rdd.toDF().write.parquet(dirname + "/rdd.parquet") - rdd.toDF().saveAsParquetFile(dirname + "/rdd.parquet") - } - - def mapValuesWithKeys[U](f: (Variant, Int, T) => U) - (implicit utt: TypeTag[U], uct: ClassTag[U]): SparkyVSM[U, Iterable[U]] = { - val localSamplesBc = sparkContext.broadcast(localSamples) - copy(rdd = rdd.map { case (v, gs) => - (v, localSamplesBc.value.view.zip(gs.view) - .map { case (s, t) => f(v, s, t) }) - }) - } - - def mapWithKeys[U](f: (Variant, Int, T) => U)(implicit uct: ClassTag[U]): RDD[U] = { - val localSamplesBc = sparkContext.broadcast(localSamples) - rdd - .flatMap { case (v, gs) => localSamplesBc.value.view.zip(gs.view) - .map { case (s, g) => f(v, s, g) } - } - } - - def flatMapWithKeys[U](f: (Variant, Int, T) => TraversableOnce[U])(implicit uct: ClassTag[U]): RDD[U] = { - val localSamplesBc = sparkContext.broadcast(localSamples) - rdd - .flatMap { case (v, gs) => localSamplesBc.value.view.zip(gs.view) - .flatMap { case (s, g) => f(v, s, g) } - } - } - - def filterVariants(p: (Variant) => Boolean) = - copy(rdd = rdd.filter { case (v, _) => p(v) }) - - def filterSamples(p: (Int) => Boolean) = { - val localSamplesBc = sparkContext.broadcast(localSamples) - copy(localSamples = localSamples.filter(p), - rdd = rdd.map { case (v, gs) => - (v, localSamplesBc.value.view.zip(gs.view) - .filter { case (s, _) => p(s) } - .map(_._2)) - }) - } - - def aggregateBySampleWithKeys[U](zeroValue: U)( - seqOp: (U, Variant, Int, T) => U, - combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Int, U)] = { - - val localSamplesBc = sparkContext.broadcast(localSamples) - - val serializer = SparkEnv.get.serializer.newInstance() - val zeroBuffer = serializer.serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) - zeroBuffer.get(zeroArray) - - rdd - .mapPartitions { (it: Iterator[(Variant, S)]) => - val serializer = SparkEnv.get.serializer.newInstance() - def copyZeroValue() = serializer.deserialize[U](ByteBuffer.wrap(zeroArray)) - val arrayZeroValue = Array.fill[U](localSamplesBc.value.length)(copyZeroValue()) - - localSamplesBc.value.iterator - .zip(it.foldLeft(arrayZeroValue) { case (acc, (v, gs)) => - for ((g, i) <- gs.zipWithIndex) - acc(i) = seqOp(acc(i), v, localSamplesBc.value(i), g) - acc - }.iterator) - }.foldByKey(zeroValue)(combOp) - } - - def aggregateByVariantWithKeys[U](zeroValue: U)( - seqOp: (U, Variant, Int, T) => U, - combOp: (U, U) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): RDD[(Variant, U)] = { - - val localSamplesBc = sparkContext.broadcast(localSamples) - - // Serialize the zero value to a byte array so that we can get a new clone of it on each key - val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) - zeroBuffer.get(zeroArray) - - rdd - .map { case (v, gs) => - val serializer = SparkEnv.get.serializer.newInstance() - val zeroValue = serializer.deserialize[U](ByteBuffer.wrap(zeroArray)) - - (v, gs.zipWithIndex.foldLeft(zeroValue) { case (acc, (g, i)) => - seqOp(acc, v, localSamplesBc.value(i), g) - }) - } - } - - def foldBySample(zeroValue: T)(combOp: (T, T) => T): RDD[(Int, T)] = { - - val localSamplesBc = sparkContext.broadcast(localSamples) - val localtct = tct - - val serializer = SparkEnv.get.serializer.newInstance() - val zeroBuffer = serializer.serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) - zeroBuffer.get(zeroArray) - - rdd - .mapPartitions { (it: Iterator[(Variant, S)]) => - val serializer = SparkEnv.get.serializer.newInstance() - def copyZeroValue() = serializer.deserialize[T](ByteBuffer.wrap(zeroArray))(localtct) - val arrayZeroValue = Array.fill[T](localSamplesBc.value.size)(copyZeroValue()) - localSamplesBc.value.iterator - .zip(it.foldLeft(arrayZeroValue) { case (acc, (v, gs)) => - for ((g, i) <- gs.zipWithIndex) - acc(i) = combOp(acc(i), g) - acc - }.iterator) - }.foldByKey(zeroValue)(combOp) - } - - def foldByVariant(zeroValue: T)(combOp: (T, T) => T): RDD[(Variant, T)] = { - rdd - .mapValues(_.foldLeft(zeroValue)((acc, g) => combOp(acc, g))) - } -} From dec9d50422285bd16481184cd52cca22dfa336a3 Mon Sep 17 00:00:00 2001 From: tpoterba Date: Sun, 29 Nov 2015 13:32:40 -0500 Subject: [PATCH 2/4] Moved SparkyVSM to VSM, removed abstract class and all vsmtype arguments --- .../broadinstitute/hail/driver/Import.scala | 5 +- .../broadinstitute/hail/driver/Write.scala | 1 + .../broadinstitute/hail/methods/LoadVCF.scala | 2 +- .../hail/variant/GenotypeStream.scala | 5 + .../hail/variant/VariantSampleMatrix.scala | 112 ++++++++++++------ .../broadinstitute/hail/variant/package.scala | 2 +- .../hail/methods/gqDpStatsSuite.scala | 4 +- .../hail/utils/TestRDDBuilder.scala | 6 +- .../hail/variant/vsm/VSMSuite.scala | 5 +- 9 files changed, 92 insertions(+), 50 deletions(-) diff --git a/src/main/scala/org/broadinstitute/hail/driver/Import.scala b/src/main/scala/org/broadinstitute/hail/driver/Import.scala index d09e35616ae..34be2f0b0b1 100644 --- a/src/main/scala/org/broadinstitute/hail/driver/Import.scala +++ b/src/main/scala/org/broadinstitute/hail/driver/Import.scala @@ -15,9 +15,6 @@ object Import extends Command { @Args4jOption(required = true, name = "-i", aliases = Array("--input"), usage = "Input file") var input: String = _ - @Args4jOption(required = false, name = "-m", aliases = Array("--vsm-type"), usage = "Select VariantSampleMatrix implementation") - var vsmtype: String = "sparky" - @Args4jOption(required = false, name = "-d", aliases = Array("--no-compress"), usage = "Don't compress in-memory representation") var noCompress: Boolean = false @@ -42,7 +39,7 @@ object Import extends Command { fatal(".gz cannot be loaded in parallel, use .bgz or -f override") } - LoadVCF(state.sc, input, options.vsmtype, !options.noCompress, + LoadVCF(state.sc, input, !options.noCompress, if (options.nPartitions != 0) Some(options.nPartitions) else diff --git a/src/main/scala/org/broadinstitute/hail/driver/Write.scala b/src/main/scala/org/broadinstitute/hail/driver/Write.scala index b3db7b150f0..1fee36b0475 100644 --- a/src/main/scala/org/broadinstitute/hail/driver/Write.scala +++ b/src/main/scala/org/broadinstitute/hail/driver/Write.scala @@ -1,6 +1,7 @@ package org.broadinstitute.hail.driver import org.broadinstitute.hail.Utils._ +import org.broadinstitute.hail.variant.VSMUtils._ import org.kohsuke.args4j.{Option => Args4jOption} object Write extends Command { diff --git a/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala b/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala index d0d761b9583..3f36b8bc2b8 100644 --- a/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala +++ b/src/main/scala/org/broadinstitute/hail/methods/LoadVCF.scala @@ -42,7 +42,7 @@ object LoadVCF { val b = new GenotypeStreamBuilder(v, compress) for (g <- gs) b += g - (v, b.result()) + (v, b.result(): Iterable[Genotype]) } } diff --git a/src/main/scala/org/broadinstitute/hail/variant/GenotypeStream.scala b/src/main/scala/org/broadinstitute/hail/variant/GenotypeStream.scala index 3063e67ac9a..c53d878be12 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/GenotypeStream.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/GenotypeStream.scala @@ -79,6 +79,11 @@ class GenotypeStreamBuilder(variant: Variant, compress: Boolean = true) this } + def ++=(i: Iterator[Genotype]): GenotypeStreamBuilder.this.type = { + i.foreach(_.write(b)) + this + } + override def clear() { b.clear() } diff --git a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala index 586cebf95ea..75ba35d8b39 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala @@ -6,84 +6,79 @@ import org.apache.spark.{SparkEnv, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.broadinstitute.hail.Utils._ +import org.broadinstitute.hail.variant._ +import scala.collection.mutable +import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.universe._ object VariantSampleMatrix { - def apply(metadata: VariantMetadata, - rdd: RDD[(Variant, GenotypeStream)]): VariantDataset = { + rdd: RDD[(Variant, Iterable[Genotype])]): VariantDataset = { new VariantSampleMatrix(metadata, rdd) } - def read(sqlContext: SQLContext, dirname: String, metadata: VariantMetadata): VariantDataset = { + def read(sqlContext: SQLContext, dirname: String): VariantDataset = { + + val metadata = readObjectFile(dirname + "/metadata.ser", sqlContext.sparkContext.hadoopConfiguration)( + _.readObject().asInstanceOf[VariantMetadata]) + import RichRow._ require(dirname.endsWith(".vds")) // val df = sqlContext.read.parquet(dirname + "/rdd.parquet") val df = sqlContext.parquetFile(dirname + "/rdd.parquet") - new VariantSampleMatrix[Genotype, GenotypeStream](metadata, df.rdd.map(r => (r.getVariant(0), r.getGenotypeStream(1)))) + new VariantSampleMatrix[Genotype](metadata, df.rdd.map(r => (r.getVariant(0), r.getGenotypeStream(1)))) } } - -class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, - localSamples: Array[Int], - val rdd: RDD[(Variant, S)]) - (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S], +class VariantSampleMatrix[T](val metadata: VariantMetadata, + val localSamples: Array[Int], + val rdd: RDD[(Variant, Iterable[T])]) + (implicit ttt: TypeTag[T], tct: ClassTag[T], vct: ClassTag[Variant]) { - def this(metadata: VariantMetadata, rdd: RDD[(Variant, S)]) - (implicit ttt: TypeTag[T], stt: TypeTag[S], tct: ClassTag[T], sct: ClassTag[S]) = + def this(metadata: VariantMetadata, rdd: RDD[(Variant, Iterable[T])]) + (implicit ttt: TypeTag[T], tct: ClassTag[T]) = this(metadata, Array.range(0, metadata.nSamples), rdd) def sampleIds: Array[String] = metadata.sampleIds + def nSamples: Int = metadata.sampleIds.length + def nLocalSamples: Int = localSamples.length - def copy[U, V <: Iterable[U]](metadata: VariantMetadata = this.metadata, + def copy[U](metadata: VariantMetadata = this.metadata, localSamples: Array[Int] = this.localSamples, - rdd: RDD[(Variant, V)] = this.rdd) - (implicit ttt: TypeTag[U], stt: TypeTag[V], tct: ClassTag[U], sct: ClassTag[V]): VariantSampleMatrix[U, V] = + rdd: RDD[(Variant, Iterable[U])] = this.rdd) + (implicit ttt: TypeTag[U], tct: ClassTag[U]): VariantSampleMatrix[U] = new VariantSampleMatrix(metadata, localSamples, rdd) def sparkContext: SparkContext = rdd.sparkContext - def cache(): VariantSampleMatrix[T, S] = copy(rdd = rdd.cache()) + def cache(): VariantSampleMatrix[T] = copy[T](rdd = rdd.cache()) - def repartition(nPartitions: Int) = copy(rdd = rdd.repartition(nPartitions)) + def repartition(nPartitions: Int) = copy[T](rdd = rdd.repartition(nPartitions)) def nPartitions: Int = rdd.partitions.length def variants: RDD[Variant] = rdd.keys + def nVariants: Long = variants.count() def expand(): RDD[(Variant, Int, T)] = mapWithKeys[(Variant, Int, T)]((v, s, g) => (v, s, g)) - def write(sqlContext: SQLContext, dirname: String) { - import sqlContext.implicits._ - - require(dirname.endsWith(".vds")) - val hConf = sparkContext.hadoopConfiguration - hadoopMkdir(dirname, hConf) - writeObjectFile(dirname + "/metadata.ser", hConf)( - _.writeObject("sparky" -> metadata)) - - // rdd.toDF().write.parquet(dirname + "/rdd.parquet") - rdd.toDF().saveAsParquetFile(dirname + "/rdd.parquet") - } - - def mapValues[U](f: (T) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U, Iterable[U]] = { + def mapValues[U](f: (T) => U)(implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U] = { mapValuesWithKeys((v, s, g) => f(g)) } def mapValuesWithKeys[U](f: (Variant, Int, T) => U) - (implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U, Iterable[U]] = { + (implicit utt: TypeTag[U], uct: ClassTag[U]): VariantSampleMatrix[U] = { val localSamplesBc = sparkContext.broadcast(localSamples) copy(rdd = rdd.map { case (v, gs) => (v, localSamplesBc.value.view.zip(gs.view) @@ -113,15 +108,15 @@ class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, } } - def filterVariants(ilist: IntervalList): VariantSampleMatrix[T, S] = + def filterVariants(ilist: IntervalList): VariantSampleMatrix[T] = filterVariants(v => ilist.contains(v.contig, v.start)) - def filterVariants(p: (Variant) => Boolean): VariantSampleMatrix[T, S] = + def filterVariants(p: (Variant) => Boolean): VariantSampleMatrix[T] = copy(rdd = rdd.filter { case (v, _) => p(v) }) def filterSamples(p: (Int) => Boolean) = { val localSamplesBc = sparkContext.broadcast(localSamples) - copy(localSamples = localSamples.filter(p), + copy[T](localSamples = localSamples.filter(p), rdd = rdd.map { case (v, gs) => (v, localSamplesBc.value.view.zip(gs.view) .filter { case (s, _) => p(s) } @@ -146,7 +141,7 @@ class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, zeroBuffer.get(zeroArray) rdd - .mapPartitions { (it: Iterator[(Variant, S)]) => + .mapPartitions { (it: Iterator[(Variant, Iterable[T])]) => val serializer = SparkEnv.get.serializer.newInstance() def copyZeroValue() = serializer.deserialize[U](ByteBuffer.wrap(zeroArray)) val arrayZeroValue = Array.fill[U](localSamplesBc.value.length)(copyZeroValue()) @@ -198,10 +193,10 @@ class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, zeroBuffer.get(zeroArray) rdd - .mapPartitions { (it: Iterator[(Variant, S)]) => + .mapPartitions { (it: Iterator[(Variant, Iterable[T])]) => val serializer = SparkEnv.get.serializer.newInstance() def copyZeroValue() = serializer.deserialize[T](ByteBuffer.wrap(zeroArray))(localtct) - val arrayZeroValue = Array.fill[T](localSamplesBc.value.size)(copyZeroValue()) + val arrayZeroValue = Array.fill[T](localSamplesBc.value.length)(copyZeroValue()) localSamplesBc.value.iterator .zip(it.foldLeft(arrayZeroValue) { case (acc, (v, gs)) => for ((g, i) <- gs.zipWithIndex) @@ -215,4 +210,47 @@ class VariantSampleMatrix[T, S <: Iterable[T]](metadata: VariantMetadata, rdd .mapValues(_.foldLeft(zeroValue)((acc, g) => combOp(acc, g))) } + +} + +class RichVDS(vds: VariantDataset) { + + def write(sqlContext: SQLContext, dirname: String, compress: Boolean = true) { + import sqlContext.implicits._ + import VSMUtils.toRichIterableGenotype + + require(dirname.endsWith(".vds")) + + val hConf = vds.sparkContext.hadoopConfiguration + hadoopMkdir(dirname, hConf) + writeObjectFile(dirname + "/metadata.ser", hConf)( + _.writeObject(vds.metadata)) + // _.writeObject("sparky" -> metadata)) + + // rdd.toDF().write.parquet(dirname + "/rdd.parquet") + vds.rdd + .map { case (v, gs) => (v, gs.toGenotypeStream(v, compress)) } + .toDF() + .saveAsParquetFile(dirname + "/rdd.parquet") + } } + + +class RichIterableGenotype(val it: Iterable[Genotype]) extends AnyVal { + + def toGenotypeStream(v: Variant, compress: Boolean): GenotypeStream = { + it match { + case gs: GenotypeStream => gs + case _ => + val b: GenotypeStreamBuilder = new GenotypeStreamBuilder(v, compress = compress) + b ++= it + b.result() + } + } + +} + +object VSMUtils { + implicit def toRichIterableGenotype(it: Iterable[Genotype]): RichIterableGenotype = new RichIterableGenotype(it) + implicit def toRichVDS(vsm: VariantDataset): RichVDS = new RichVDS(vsm) +} \ No newline at end of file diff --git a/src/main/scala/org/broadinstitute/hail/variant/package.scala b/src/main/scala/org/broadinstitute/hail/variant/package.scala index 0b752308307..d0cab37c6cc 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/package.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/package.scala @@ -4,7 +4,7 @@ import org.apache.spark.rdd.RDD import org.broadinstitute.hail.variant.{GenotypeStream, Variant} package object variant { - type VariantDataset = VariantSampleMatrix[Genotype, Iterable[Genotype]] + type VariantDataset = VariantSampleMatrix[Genotype] // type VariantSampleMatrix[T, S] = managed.ManagedVSM[T, S] // type VariantSampleMatrix[T, S <: Iterable[(Int, T)]] = sparky.SparkyVSM[T, S] diff --git a/src/test/scala/org/broadinstitute/hail/methods/gqDpStatsSuite.scala b/src/test/scala/org/broadinstitute/hail/methods/gqDpStatsSuite.scala index 6cf855fc9b7..675c1b3b93d 100644 --- a/src/test/scala/org/broadinstitute/hail/methods/gqDpStatsSuite.scala +++ b/src/test/scala/org/broadinstitute/hail/methods/gqDpStatsSuite.scala @@ -22,7 +22,7 @@ class gqDpStatsSuite extends SparkSuite { val variantDevs = Array(1.87082869, 5.33853913, 7.27581611, 4.28478413) // DP test first - val dpVds = TestRDDBuilder.buildRDD(8, 4, sc, "sparky", dpArray=Some(arr), gqArray=None) + val dpVds = TestRDDBuilder.buildRDD(8, 4, sc, dpArray=Some(arr), gqArray=None) val dpSingletons = sSingletonVariants(dpVds) val dpVariantR = VariantQC.results(dpVds) val dpSampleR = SampleQC.results(dpVds, dpSingletons) @@ -44,7 +44,7 @@ class gqDpStatsSuite extends SparkSuite { } // now test GQ - val gqVds = TestRDDBuilder.buildRDD(8, 4, sc, "sparky", dpArray=None, gqArray=Some(arr)) + val gqVds = TestRDDBuilder.buildRDD(8, 4, sc, dpArray=None, gqArray=Some(arr)) val gqSingletons = sSingletonVariants(gqVds) val gqVariantR = VariantQC.results(gqVds) val gqSampleR = SampleQC.results(gqVds, gqSingletons) diff --git a/src/test/scala/org/broadinstitute/hail/utils/TestRDDBuilder.scala b/src/test/scala/org/broadinstitute/hail/utils/TestRDDBuilder.scala index 5331a374286..0e008904073 100644 --- a/src/test/scala/org/broadinstitute/hail/utils/TestRDDBuilder.scala +++ b/src/test/scala/org/broadinstitute/hail/utils/TestRDDBuilder.scala @@ -76,7 +76,7 @@ object TestRDDBuilder { } } - def buildRDD(nSamples: Int, nVariants: Int, sc: SparkContext, vsmtype: String, + def buildRDD(nSamples: Int, nVariants: Int, sc: SparkContext, gqArray: Option[Array[Array[Int]]] = None, dpArray: Option[Array[Array[Int]]] = None): VariantDataset = { /* Takes the arguments: @@ -118,8 +118,8 @@ object TestRDDBuilder { b += Genotype(gt, ad, dp, pl) } - (variant, b.result()) + (variant, b.result(): Iterable[Genotype]) } - VariantSampleMatrix(vsmtype, VariantMetadata(Map("1" -> 1000000), sampleList, None), streamRDD) + VariantSampleMatrix(VariantMetadata(Map("1" -> 1000000), sampleList, None), streamRDD) } } \ No newline at end of file diff --git a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala index d5eff7afd4b..561f98fa25c 100644 --- a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala +++ b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala @@ -2,6 +2,7 @@ package org.broadinstitute.hail.variant.vsm import org.broadinstitute.hail.SparkSuite import org.broadinstitute.hail.variant.{Variant, VariantSampleMatrix} +import org.broadinstitute.hail.variant.VSMUtils._ import org.broadinstitute.hail.Utils._ import scala.collection.mutable import scala.util.Random @@ -22,7 +23,7 @@ class VSMSuite extends SparkSuite { val result = "rm -rf " + vdsdir !; assert(result == 0) - LoadVCF(sc, "src/test/resources/sample.vcf.gz", vsmtype = vsmtype) + LoadVCF(sc, "src/test/resources/sample.vcf.gz") .write(sqlContext, vdsdir) val vds = VariantSampleMatrix.read(sqlContext, vdsdir) @@ -55,7 +56,7 @@ class VSMSuite extends SparkSuite { vsmTypes.foreach { vsmtype => val localKeep = keep - val filtered = LoadVCF(sc, "src/test/resources/sample.vcf.gz", vsmtype = vsmtype) + val filtered = LoadVCF(sc, "src/test/resources/sample.vcf.gz") .filterSamples(s => localKeep(s)) val filteredAsMap = filtered.mapWithKeys((v, s, g) => ((v, s), g)).collectAsMap() From bd43175e7245578ad149fa87724a22018ccfed48 Mon Sep 17 00:00:00 2001 From: tpoterba Date: Mon, 30 Nov 2015 11:59:04 -0500 Subject: [PATCH 3/4] Fixed first round of comments from Cotton --- .../broadinstitute/hail/driver/Write.scala | 1 - .../hail/variant/VariantSampleMatrix.scala | 36 +++---------------- .../broadinstitute/hail/variant/package.scala | 15 ++++++++ .../hail/variant/vsm/VSMSuite.scala | 1 - 4 files changed, 19 insertions(+), 34 deletions(-) diff --git a/src/main/scala/org/broadinstitute/hail/driver/Write.scala b/src/main/scala/org/broadinstitute/hail/driver/Write.scala index 1fee36b0475..b3db7b150f0 100644 --- a/src/main/scala/org/broadinstitute/hail/driver/Write.scala +++ b/src/main/scala/org/broadinstitute/hail/driver/Write.scala @@ -1,7 +1,6 @@ package org.broadinstitute.hail.driver import org.broadinstitute.hail.Utils._ -import org.broadinstitute.hail.variant.VSMUtils._ import org.kohsuke.args4j.{Option => Args4jOption} object Write extends Command { diff --git a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala index 75ba35d8b39..7a8ff791a1c 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala @@ -6,8 +6,6 @@ import org.apache.spark.{SparkEnv, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.broadinstitute.hail.Utils._ -import org.broadinstitute.hail.variant._ -import scala.collection.mutable import scala.language.implicitConversions import scala.reflect.ClassTag @@ -21,14 +19,12 @@ object VariantSampleMatrix { } def read(sqlContext: SQLContext, dirname: String): VariantDataset = { + require(dirname.endsWith(".vds")) + import RichRow._ val metadata = readObjectFile(dirname + "/metadata.ser", sqlContext.sparkContext.hadoopConfiguration)( _.readObject().asInstanceOf[VariantMetadata]) - import RichRow._ - - require(dirname.endsWith(".vds")) - // val df = sqlContext.read.parquet(dirname + "/rdd.parquet") val df = sqlContext.parquetFile(dirname + "/rdd.parquet") new VariantSampleMatrix[Genotype](metadata, df.rdd.map(r => (r.getVariant(0), r.getGenotypeStream(1)))) @@ -206,10 +202,8 @@ class VariantSampleMatrix[T](val metadata: VariantMetadata, }.foldByKey(zeroValue)(combOp) } - def foldByVariant(zeroValue: T)(combOp: (T, T) => T): RDD[(Variant, T)] = { - rdd - .mapValues(_.foldLeft(zeroValue)((acc, g) => combOp(acc, g))) - } + def foldByVariant(zeroValue: T)(combOp: (T, T) => T): RDD[(Variant, T)] = + rdd.mapValues(_.foldLeft(zeroValue)((acc, g) => combOp(acc, g))) } @@ -217,7 +211,6 @@ class RichVDS(vds: VariantDataset) { def write(sqlContext: SQLContext, dirname: String, compress: Boolean = true) { import sqlContext.implicits._ - import VSMUtils.toRichIterableGenotype require(dirname.endsWith(".vds")) @@ -225,7 +218,6 @@ class RichVDS(vds: VariantDataset) { hadoopMkdir(dirname, hConf) writeObjectFile(dirname + "/metadata.ser", hConf)( _.writeObject(vds.metadata)) - // _.writeObject("sparky" -> metadata)) // rdd.toDF().write.parquet(dirname + "/rdd.parquet") vds.rdd @@ -234,23 +226,3 @@ class RichVDS(vds: VariantDataset) { .saveAsParquetFile(dirname + "/rdd.parquet") } } - - -class RichIterableGenotype(val it: Iterable[Genotype]) extends AnyVal { - - def toGenotypeStream(v: Variant, compress: Boolean): GenotypeStream = { - it match { - case gs: GenotypeStream => gs - case _ => - val b: GenotypeStreamBuilder = new GenotypeStreamBuilder(v, compress = compress) - b ++= it - b.result() - } - } - -} - -object VSMUtils { - implicit def toRichIterableGenotype(it: Iterable[Genotype]): RichIterableGenotype = new RichIterableGenotype(it) - implicit def toRichVDS(vsm: VariantDataset): RichVDS = new RichVDS(vsm) -} \ No newline at end of file diff --git a/src/main/scala/org/broadinstitute/hail/variant/package.scala b/src/main/scala/org/broadinstitute/hail/variant/package.scala index d0cab37c6cc..579070b8371 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/package.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/package.scala @@ -1,11 +1,26 @@ package org.broadinstitute.hail import org.apache.spark.rdd.RDD +import scala.language.implicitConversions import org.broadinstitute.hail.variant.{GenotypeStream, Variant} package object variant { type VariantDataset = VariantSampleMatrix[Genotype] + class RichIterableGenotype(val it: Iterable[Genotype]) extends AnyVal { + def toGenotypeStream(v: Variant, compress: Boolean): GenotypeStream = + it match { + case gs: GenotypeStream => gs + case _ => + val b: GenotypeStreamBuilder = new GenotypeStreamBuilder(v, compress = compress) + b ++= it + b.result() + } + } + + implicit def toRichIterableGenotype(it: Iterable[Genotype]): RichIterableGenotype = new RichIterableGenotype(it) + implicit def toRichVDS(vsm: VariantDataset): RichVDS = new RichVDS(vsm) + // type VariantSampleMatrix[T, S] = managed.ManagedVSM[T, S] // type VariantSampleMatrix[T, S <: Iterable[(Int, T)]] = sparky.SparkyVSM[T, S] // def importToVSM(rdd: RDD[(Variant, GenotypeStream)]) = rdd diff --git a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala index 561f98fa25c..6670f5217c6 100644 --- a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala +++ b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala @@ -2,7 +2,6 @@ package org.broadinstitute.hail.variant.vsm import org.broadinstitute.hail.SparkSuite import org.broadinstitute.hail.variant.{Variant, VariantSampleMatrix} -import org.broadinstitute.hail.variant.VSMUtils._ import org.broadinstitute.hail.Utils._ import scala.collection.mutable import scala.util.Random From 1031cf8f3adb9a0f5d3df0e89c36dbfcbb29e362 Mon Sep 17 00:00:00 2001 From: tpoterba Date: Tue, 1 Dec 2015 12:52:31 -0500 Subject: [PATCH 4/4] Fixed 2nd round of comments --- .../hail/variant/VariantSampleMatrix.scala | 1 + .../hail/variant/vsm/VSMSuite.scala | 40 +++++-------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala index 7a8ff791a1c..eaaee971852 100644 --- a/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala +++ b/src/main/scala/org/broadinstitute/hail/variant/VariantSampleMatrix.scala @@ -207,6 +207,7 @@ class VariantSampleMatrix[T](val metadata: VariantMetadata, } +// FIXME AnyVal Scala 2.11 class RichVDS(vds: VariantDataset) { def write(sqlContext: SQLContext, dirname: String, compress: Boolean = true) { diff --git a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala index 6670f5217c6..484c81e327c 100644 --- a/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala +++ b/src/test/scala/org/broadinstitute/hail/variant/vsm/VSMSuite.scala @@ -11,26 +11,6 @@ import org.broadinstitute.hail.methods.{sSingletonVariants, LoadVCF} import org.testng.annotations.Test class VSMSuite extends SparkSuite { - val vsmTypes = List("sparky") - - @Test def testsSingletonVariants() { - val singletons: List[Set[Variant]] = - vsmTypes - .map(vsmtype => { - val vdsdir = "/tmp/sample." + vsmtype + ".vds" - - val result = "rm -rf " + vdsdir !; - assert(result == 0) - - LoadVCF(sc, "src/test/resources/sample.vcf.gz") - .write(sqlContext, vdsdir) - - val vds = VariantSampleMatrix.read(sqlContext, vdsdir) - sSingletonVariants(vds) - }) - - assert(singletons.tail.forall(s => s == singletons.head)) - } @Test def testFilterSamples() { val vds = LoadVCF(sc, "src/test/resources/sample.vcf.gz") @@ -53,20 +33,18 @@ class VSMSuite extends SparkSuite { } } - vsmTypes.foreach { vsmtype => - val localKeep = keep - val filtered = LoadVCF(sc, "src/test/resources/sample.vcf.gz") - .filterSamples(s => localKeep(s)) + val localKeep = keep + val filtered = LoadVCF(sc, "src/test/resources/sample.vcf.gz") + .filterSamples(s => localKeep(s)) - val filteredAsMap = filtered.mapWithKeys((v, s, g) => ((v, s), g)).collectAsMap() - filteredAsMap.foreach { case (k, g) => simpleAssert(vdsAsMap(k) == g) } + val filteredAsMap = filtered.mapWithKeys((v, s, g) => ((v, s), g)).collectAsMap() + filteredAsMap.foreach { case (k, g) => simpleAssert(vdsAsMap(k) == g) } - simpleAssert(filtered.nSamples == nSamples) - simpleAssert(filtered.localSamples.toSet == keep) + simpleAssert(filtered.nSamples == nSamples) + simpleAssert(filtered.localSamples.toSet == keep) - val sampleKeys = filtered.mapWithKeys((v, s, g) => s).distinct.collect() - assert(sampleKeys.toSet == keep) - } + val sampleKeys = filtered.mapWithKeys((v, s, g) => s).distinct.collect() + assert(sampleKeys.toSet == keep) } } }