Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created RVD #8

Merged
merged 6 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/HailContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import is.hail.io.gen.GenLoader
import is.hail.io.plink.{FamFileConfig, PlinkLoader}
import is.hail.io.vcf._
import is.hail.keytable.KeyTable
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.OrderedRVD
import is.hail.stats.{BaldingNicholsModel, Distribution, UniformDist}
import is.hail.utils.{log, _}
import is.hail.variant.{GenericDataset, GenomeReference, Genotype, HTSGenotypeView, Locus, VSMFileMetadata, VSMSubgen, Variant, VariantDataset, VariantSampleMatrix}
Expand Down
54 changes: 54 additions & 0 deletions src/main/scala/is/hail/annotations/WritableRegionValue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package is.hail.annotations

import is.hail.expr.{TStruct, Type}

object WritableRegionValue {
def apply(t: Type, initial: RegionValue): WritableRegionValue =
WritableRegionValue(t, initial.region, initial.offset)

def apply(t: Type, initialRegion: MemoryBuffer, initialOffset: Long): WritableRegionValue = {
val wrv = WritableRegionValue(t)
wrv.set(initialRegion, initialOffset)
wrv
}

def apply(t: Type): WritableRegionValue = {
val region = MemoryBuffer()
new WritableRegionValue(t, region, new RegionValueBuilder(region), RegionValue(region, 0))
}
}

class WritableRegionValue(val t: Type,
val region: MemoryBuffer,
rvb: RegionValueBuilder,
val value: RegionValue) {

def offset: Long = value.offset

def setSelect(fromT: TStruct, toFromFieldIdx: Array[Int], fromRV: RegionValue) {
(t: @unchecked) match {
case t: TStruct =>
region.clear()
rvb.start(t)
rvb.startStruct()
var i = 0
while (i < t.size) {
rvb.addField(fromT, fromRV, toFromFieldIdx(i))
i += 1
}
rvb.endStruct()
value.setOffset(rvb.end())
}
}

def set(rv: RegionValue): Unit = set(rv.region, rv.offset)

def set(fromRegion: MemoryBuffer, fromOffset: Long) {
region.clear()
rvb.start(t)
rvb.addRegionValue(t, fromRegion, fromOffset)
value.setOffset(rvb.end())
}

def pretty: String = value.pretty(t)
}
37 changes: 19 additions & 18 deletions src/main/scala/is/hail/expr/Relational.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import is.hail.HailContext
import is.hail.annotations._
import is.hail.methods.Aggregators
import is.hail.sparkextras._
import is.hail.rvd.{OrderedRVD, OrderedRVPartitioner, OrderedRVType}
import is.hail.variant.{VSMFileMetadata, VSMLocalValue, VSMMetadata}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -40,15 +41,15 @@ case class MatrixType(
"va" -> vaType,
"gs" -> !TArray(genotypeType))

def orderedRDD2Type: OrderedRDD2Type = {
new OrderedRDD2Type(Array("pk"),
def orderedRVType: OrderedRVType = {
new OrderedRVType(Array("pk"),
Array("pk", "v"),
rowType)
}

def pkType: TStruct = orderedRDD2Type.pkType
def pkType: TStruct = orderedRVType.pkType

def kType: TStruct = orderedRDD2Type.kType
def kType: TStruct = orderedRVType.kType

def sampleEC: EvalContext = {
val aggregationST = Map(
Expand Down Expand Up @@ -178,10 +179,10 @@ object MatrixValue {
val localNSamples = localValue.nSamples
val rangeBoundsType = TArray(typ.pkType)
new MatrixValue(typ, localValue,
OrderedRDD2(typ.orderedRDD2Type,
new OrderedPartitioner2(rdd.orderedPartitioner.numPartitions,
typ.orderedRDD2Type.partitionKey,
typ.orderedRDD2Type.kType,
OrderedRVD(typ.orderedRVType,
new OrderedRVPartitioner(rdd.orderedPartitioner.numPartitions,
typ.orderedRVType.partitionKey,
typ.orderedRVType.kType,
UnsafeIndexedSeq(rangeBoundsType,
rdd.orderedPartitioner.rangeBounds.map(b => Row(b)))),
rdd.mapPartitions { it =>
Expand Down Expand Up @@ -216,10 +217,10 @@ object MatrixValue {
case class MatrixValue(
typ: MatrixType,
localValue: VSMLocalValue,
rdd2: OrderedRDD2) {
rdd2: OrderedRVD) {

def rdd: OrderedRDD[Annotation, Annotation, (Annotation, Iterable[Annotation])] = {
warn("converting OrderedRDD2 => OrderedRDD")
warn("converting OrderedRVD => OrderedRDD")

implicit val kOk: OrderedKey[Annotation, Annotation] = typ.vType.orderedKey

Expand All @@ -239,10 +240,10 @@ case class MatrixValue(
ur.getAs[IndexedSeq[Annotation]](3): Iterable[Annotation]))
},
OrderedPartitioner(
rdd2.orderedPartitioner.rangeBounds.map { b =>
rdd2.partitioner.rangeBounds.map { b =>
b.asInstanceOf[Row].get(0)
}.toArray(kOk.pkct),
rdd2.orderedPartitioner.numPartitions))
rdd2.partitioner.numPartitions))
}

def copyRDD(typ: MatrixType = typ,
Expand Down Expand Up @@ -280,7 +281,7 @@ case class MatrixValue(
localValue.copy(
sampleIds = keep.map(sampleIds),
sampleAnnotations = keep.map(sampleAnnotations)),
rdd2 = rdd2.mapPartitionsPreservesPartitioning { it =>
rdd2 = rdd2.mapPartitionsPreservesPartitioning(typ.orderedRVType) { it =>
var rv2b = new RegionValueBuilder()
var rv2 = RegionValue()

Expand Down Expand Up @@ -392,16 +393,16 @@ case class MatrixRead(

val rdd =
if (dropVariants)
OrderedRDD2.empty(hc.sc, typ.orderedRDD2Type)
OrderedRVD.empty(hc.sc, typ.orderedRVType)
else {
var rdd = OrderedRDD2(
typ.orderedRDD2Type,
OrderedPartitioner2(hc.sc,
var rdd = OrderedRVD(
typ.orderedRVType,
OrderedRVPartitioner(hc.sc,
hc.hadoopConf.readFile(path + "/partitioner.json.gz")(JsonMethods.parse(_))),
hc.readRows(path, typ.rowType, nPartitions))
if (dropSamples) {
val localRowType = typ.rowType
rdd = rdd.mapPartitionsPreservesPartitioning { it =>
rdd = rdd.mapPartitionsPreservesPartitioning(typ.orderedRVType) { it =>
var rv2b = new RegionValueBuilder()
var rv2 = RegionValue()

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/is/hail/io/LoadMatrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package is.hail.io
import is.hail.HailContext
import is.hail.annotations._
import is.hail.expr._
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.OrderedRVD
import is.hail.utils._
import is.hail.variant._
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -311,6 +311,6 @@ object LoadMatrix {
VSMLocalValue(Annotation.empty,
sampleIds,
Annotation.emptyIndexedSeq(sampleIds.length)),
OrderedRDD2(matrixType.orderedRDD2Type, rdd, Some(rowKeys), None))
OrderedRVD(matrixType.orderedRVType, rdd, Some(rowKeys), None))
}
}
4 changes: 2 additions & 2 deletions src/main/scala/is/hail/io/bgen/BgenLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import is.hail.annotations._
import is.hail.expr.{MatrixType, TArray, TCall, TFloat64, TString, TStruct, TVariant}
import is.hail.io.vcf.LoadVCF
import is.hail.io.{HadoopFSDataBinaryReader, IndexBTree}
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.OrderedRVD
import is.hail.utils._
import is.hail.variant._
import org.apache.hadoop.io.LongWritable
Expand Down Expand Up @@ -127,7 +127,7 @@ object BgenLoader {
VSMLocalValue(globalAnnotation = Annotation.empty,
sampleIds = sampleIds,
sampleAnnotations = Array.fill(nSamples)(Annotation.empty)),
OrderedRDD2(matrixType.orderedRDD2Type, rdd2, Some(fastKeys), None))
OrderedRVD(matrixType.orderedRVType, rdd2, Some(fastKeys), None))
}

def index(hConf: org.apache.hadoop.conf.Configuration, file: String) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/is/hail/io/plink/PlinkLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import is.hail.HailContext
import is.hail.annotations._
import is.hail.expr._
import is.hail.io.vcf.LoadVCF
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.OrderedRVD
import is.hail.utils.StringEscapeUtils._
import is.hail.utils._
import is.hail.variant._
Expand Down Expand Up @@ -193,7 +193,7 @@ object PlinkLoader {
VSMLocalValue(globalAnnotation = Annotation.empty,
sampleIds = sampleIds,
sampleAnnotations = sampleAnnotations),
OrderedRDD2(matrixType.orderedRDD2Type, rdd2, Some(fastKeys), None))
OrderedRVD(matrixType.orderedRVType, rdd2, Some(fastKeys), None))
}

def apply(hc: HailContext, bedPath: String, bimPath: String, famPath: String, ffConfig: FamFileConfig,
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import is.hail.HailContext
import is.hail.annotations._
import is.hail.expr.{TStruct, _}
import is.hail.io.{VCFAttributes, VCFMetadata}
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.OrderedRVD
import is.hail.utils._
import is.hail.variant._
import org.apache.hadoop
Expand Down Expand Up @@ -892,8 +892,8 @@ object LoadVCF {
// nothing after the key
val justVariants = parseLines(() => ())((c, l, rvb) => ())(lines, kType)

val rdd = OrderedRDD2(
matrixType.orderedRDD2Type,
val rdd = OrderedRVD(
matrixType.orderedRVType,
parseLines { () =>
new ParseLineContext(genotypeSignature, new BufferedLineIterator(headerLinesBc.value.iterator.buffered))
} { (c, l, rvb) =>
Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/is/hail/methods/FilterAlleles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package is.hail.methods

import is.hail.annotations._
import is.hail.expr.{EvalContext, Parser, TArray, TInt32, TVariant}
import is.hail.sparkextras.OrderedRDD2
import is.hail.rvd.{OrderedRVD, RVD}
import is.hail.utils._
import is.hail.variant.{GenomeReference, Locus, Variant, VariantDataset, VariantSampleMatrix}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -51,8 +51,8 @@ object FilterAlleles {
val newMatrixType = vsm.matrixType.copy(vaType = vAnnotator.newT,
genotypeType = gAnnotator.newT)

def filter(rdd: RDD[RegionValue],
removeLeftAligned: Boolean, removeMoving: Boolean, verifyLeftAligned: Boolean): RDD[RegionValue] = {
def filter(rdd: RVD,
removeLeftAligned: Boolean, removeMoving: Boolean, verifyLeftAligned: Boolean): RVD = {

def filterAllelesInVariant(prevlocus: Locus, v: Variant, va: Annotation): Option[(Variant, IndexedSeq[Int], IndexedSeq[Int])] = {
var alive = 0
Expand Down Expand Up @@ -109,7 +109,7 @@ object FilterAlleles {
val localSampleIdsBc = vsm.sampleIdsBc
val localSampleAnnotationsBc = vsm.sampleAnnotationsBc

rdd.mapPartitions { it =>
rdd.mapPartitions(newRowType) { it =>
var prevLocus: Locus = null

it.flatMap { rv =>
Expand Down Expand Up @@ -158,18 +158,18 @@ object FilterAlleles {
}
}

val newRDD2: OrderedRDD2 =
val newRDD2: OrderedRVD =
if (leftAligned) {
OrderedRDD2(newMatrixType.orderedRDD2Type,
vsm.rdd2.orderedPartitioner,
OrderedRVD(newMatrixType.orderedRVType,
vsm.rdd2.partitioner,
filter(vsm.rdd2, removeLeftAligned = false, removeMoving = false, verifyLeftAligned = true))
} else {
val leftAlignedVariants = OrderedRDD2(newMatrixType.orderedRDD2Type,
vsm.rdd2.orderedPartitioner,
val leftAlignedVariants = OrderedRVD(newMatrixType.orderedRVType,
vsm.rdd2.partitioner,
filter(vsm.rdd2, removeLeftAligned = false, removeMoving = true, verifyLeftAligned = false))

val movingVariants = OrderedRDD2.shuffle(newMatrixType.orderedRDD2Type,
vsm.rdd2.orderedPartitioner,
val movingVariants = OrderedRVD.shuffle(newMatrixType.orderedRVType,
vsm.rdd2.partitioner,
filter(vsm.rdd2, removeLeftAligned = true, removeMoving = false, verifyLeftAligned = false))

leftAlignedVariants.partitionSortedUnion(movingVariants)
Expand Down
Loading