Skip to content

Commit

Permalink
Eliminate some but not all uses of RVD.rdd (hail-is#3186)
Browse files Browse the repository at this point in the history
* Eliminate some but not all uses of RVD.rdd

This change anticipates the ContextRDD change wherein `RVD.rdd` will not
be an RDD. Moreover, enforcing an abstraction barrier at the level of
`RVD` will ease changes to the implementation of `RVD`.

There are two remaining types of calls that I cannot eliminate:

 - uses in BlockMatrix and OrderedRDD2: these two classes are building
   new RDDs based on the RVD's rdd, these classes should be considered
   within the implementation of the RVD abstraction. Because these two
   classes are outside of `is.hail.rvd`, I cannot enforce an access
   modifier on `RVD.rdd`.

 - uses by methods:

   - LDPrune: it seems we need a "GeneralRVD"

   - Skat: it seems like some of this could be moved to python actually;
     but there is some matrix math that cannot be moved until the expr
     lang has efficient small-matrix ops

   - MatrixTable.same: I could probably move this if I re-implemented
     forall in terms of RVD.aggregate?

   - MatrixTable.annotateRowsIntervalTable: really not sure about this
     one, this seems like a performance optimization that purposely
     reaches through the abstraction to do Smart Things

* clean up

* formatting

* more formatting

* use assertOrdered instead of old apply

* fixes

* improve use of assertions

* rename toUnsafeRows to toRows

* rename unsafeChangeType to updateType

* wip zip not sure what to do

* finish renames

* fix invalid assertions

* remove coerceOrdered, remove OrderedRVD.apply

* fixes and eliminate coerceOrdered

* actually remove coerceOrdered

* fix

* clean up zipPartitions definitions and uses

* name error

* fix name

* Update OrderedRVD.scala

* Update OrderedRVD.scala

* fix filteralleles shuffle and friends

* formatting

* rebase errors

* harmonize formatting

* remove rebase cruft
  • Loading branch information
danking authored and jackgoldsmith4 committed Jun 25, 2018
1 parent aff619c commit 8b39e6a
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 123 deletions.
77 changes: 39 additions & 38 deletions src/main/scala/is/hail/expr/Relational.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ case class MatrixValue(
colValues: IndexedSeq[Annotation],
rvd: OrderedRVD) {

assert(rvd.typ == typ.orvdType)

def sparkContext: SparkContext = rvd.sparkContext

def nPartitions: Int = rvd.partitions.length
Expand Down Expand Up @@ -252,42 +254,43 @@ case class MatrixRead(
} else {
val entriesRVD = spec.entriesComponent.read(hc, path)
val entriesRowType = entriesRVD.rowType
OrderedRVD(typ.orvdType,
rowsRVD.partitioner,
rowsRVD.rdd.zipPartitions(entriesRVD.rdd) { case (it1, it2) =>
val rvb = new RegionValueBuilder()

new Iterator[RegionValue] {
def hasNext: Boolean = {
val hn = it1.hasNext
assert(hn == it2.hasNext)
hn
}
rowsRVD.zipPartitionsPreservesPartitioning(
typ.orvdType,
entriesRVD
) { case (it1, it2) =>
val rvb = new RegionValueBuilder()

new Iterator[RegionValue] {
def hasNext: Boolean = {
val hn = it1.hasNext
assert(hn == it2.hasNext)
hn
}

def next(): RegionValue = {
val rv1 = it1.next()
val rv2 = it2.next()
val region = rv2.region
rvb.set(region)
rvb.start(fullRowType)
rvb.startStruct()
var i = 0
while (i < localEntriesIndex) {
rvb.addField(fullRowType, rv1, i)
i += 1
}
rvb.addField(entriesRowType, rv2, 0)
def next(): RegionValue = {
val rv1 = it1.next()
val rv2 = it2.next()
val region = rv2.region
rvb.set(region)
rvb.start(fullRowType)
rvb.startStruct()
var i = 0
while (i < localEntriesIndex) {
rvb.addField(fullRowType, rv1, i)
i += 1
while (i < fullRowType.size) {
rvb.addField(fullRowType, rv1, i - 1)
i += 1
}
rvb.endStruct()
rv2.set(region, rvb.end())
rv2
}
rvb.addField(entriesRowType, rv2, 0)
i += 1
while (i < fullRowType.size) {
rvb.addField(fullRowType, rv1, i - 1)
i += 1
}
rvb.endStruct()
rv2.set(region, rvb.end())
rv2
}
})
}
}
}
}

Expand Down Expand Up @@ -450,10 +453,8 @@ case class MapEntries(child: MatrixIR, newEntries: IR) extends MatrixIR {
}

case class TableValue(typ: TableType, globals: BroadcastValue, rvd: RVD) {
def rdd: RDD[Row] = {
val localRowType = typ.rowType
rvd.rdd.map { rv => new UnsafeRow(localRowType, rv.region.copy(), rv.offset) }
}
def rdd: RDD[Row] =
rvd.toRows

def filter(p: (RegionValue, RegionValue) => Boolean): TableValue = {
val globalType = typ.globalType
Expand Down Expand Up @@ -642,7 +643,7 @@ case class TableJoin(left: TableIR, right: TableIR, joinType: String) extends Ta
val leftORVD = leftTV.rvd match {
case ordered: OrderedRVD => ordered
case unordered =>
OrderedRVD(
OrderedRVD.coerce(
new OrderedRVDType(left.typ.key.toArray, left.typ.key.toArray, leftRowType),
unordered.rdd,
None,
Expand All @@ -656,7 +657,7 @@ case class TableJoin(left: TableIR, right: TableIR, joinType: String) extends Ta
if (joinType == "left" || joinType == "inner")
unordered.constrainToOrderedPartitioner(ordType, leftORVD.partitioner)
else
OrderedRVD(ordType, unordered.rdd, None, Some(leftORVD.partitioner))
OrderedRVD.coerce(ordType, unordered.rdd, None, Some(leftORVD.partitioner))
}
val joinedRVD = leftORVD.orderedJoin(
rightORVD,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/LoadMatrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ object LoadMatrix {
val (partitioner, keepPartitions) = makePartitionerFromCounts(partitionCounts, matrixType.orvdType.pkType)
OrderedRVD(matrixType.orvdType, partitioner, rdd.subsetPartitions(keepPartitions))
} else
OrderedRVD(matrixType.orvdType, rdd, None, None)
OrderedRVD.coerce(matrixType.orvdType, rdd, None, None)

new MatrixTable(hc,
matrixType,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/bgen/LoadBgen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ object LoadBgen {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleIds.map(x => Annotation(x)),
OrderedRVD(matrixType.orvdType, rdd2, Some(fastKeys), None))
OrderedRVD.coerce(matrixType.orvdType, rdd2, Some(fastKeys), None))
}

def index(hConf: org.apache.hadoop.conf.Configuration, file: String) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/plink/LoadPlink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ object LoadPlink {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleAnnotations,
OrderedRVD(matrixType.orvdType, rdd2, Some(fastKeys), None))
OrderedRVD.coerce(matrixType.orvdType, rdd2, Some(fastKeys), None))
}

def apply(hc: HailContext, bedPath: String, bimPath: String, famPath: String, ffConfig: FamFileConfig,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/vcf/LoadGDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,6 @@ object LoadGDB {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleIds.map(x => Annotation(x)),
OrderedRVD(matrixType.orvdType, hc.sc.parallelize(records), None, None))
OrderedRVD.coerce(matrixType.orvdType, hc.sc.parallelize(records), None, None))
}
}
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ object LoadVCF {
// nothing after the key
val justVariants = parseLines(() => ())((c, l, rvb) => ())(lines, kType, rg, contigRecoding)

val rdd = OrderedRVD(
val rdd = OrderedRVD.coerce(
matrixType.orvdType,
parseLines { () =>
new ParseLineContext(genotypeSignature, new BufferedLineIterator(headerLinesBc.value.iterator.buffered))
Expand Down
143 changes: 102 additions & 41 deletions src/main/scala/is/hail/rvd/OrderedRVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class OrderedRVD(
self =>
def rowType: TStruct = typ.rowType

def updateType(newTyp: OrderedRVDType): OrderedRVD =
OrderedRVD(newTyp, partitioner, rdd)

def mapPreservesPartitioning(newTyp: OrderedRVDType)(f: (RegionValue) => RegionValue): OrderedRVD =
OrderedRVD(newTyp,
partitioner,
Expand All @@ -38,13 +41,6 @@ class OrderedRVD(
partitioner,
rdd.mapPartitions(f))

def zipPartitionsPreservesPartitioning[T](newTyp: OrderedRVDType, rdd2: RDD[T])(f: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue])(implicit tct: ClassTag[T]): OrderedRVD =
OrderedRVD(newTyp,
partitioner,
rdd.zipPartitions(rdd2) { case (it, it2) =>
f(it, it2)
})

override def filter(p: (RegionValue) => Boolean): OrderedRVD =
OrderedRVD(typ,
partitioner,
Expand Down Expand Up @@ -356,28 +352,41 @@ class OrderedRVD(
spec.write(sparkContext.hadoopConfiguration, path)
partitionCounts
}
}

object OrderedRVD {
type CoercionMethod = Int
def zipPartitionsPreservesPartitioning[T: ClassTag](
newTyp: OrderedRVDType,
that: RDD[T]
)(zipper: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue]
): OrderedRVD =
OrderedRVD(
newTyp,
partitioner,
this.rdd.zipPartitions(that, preservesPartitioning = true)(zipper))

def zipPartitionsPreservesPartitioning(
newTyp: OrderedRVDType,
that: RVD
)(zipper: (Iterator[RegionValue], Iterator[RegionValue]) => Iterator[RegionValue]
): OrderedRVD =
OrderedRVD(
newTyp,
partitioner,
this.rdd.zipPartitions(that.rdd, preservesPartitioning = true)(zipper))

final val ORDERED_PARTITIONER: CoercionMethod = 0
final val AS_IS: CoercionMethod = 1
final val LOCAL_SORT: CoercionMethod = 2
final val SHUFFLE: CoercionMethod = 3
def writeRowsSplit(
path: String,
t: MatrixType,
codecSpec: CodecSpec
): Array[Long] = rdd.writeRowsSplit(path, t, codecSpec, partitioner)
}

object OrderedRVD {
def empty(sc: SparkContext, typ: OrderedRVDType): OrderedRVD = {
OrderedRVD(typ,
OrderedRVDPartitioner.empty(typ),
sc.emptyRDD[RegionValue])
}

def apply(typ: OrderedRVDType,
rdd: RDD[RegionValue], fastKeys: Option[RDD[RegionValue]], hintPartitioner: Option[OrderedRVDPartitioner]): OrderedRVD = {
val (_, orderedRVD) = coerce(typ, rdd, fastKeys, hintPartitioner)
orderedRVD
}

/**
* Precondition: the iterator it is PK-sorted. We lazily K-sort each block
* of PK-equivalent elements.
Expand Down Expand Up @@ -448,24 +457,62 @@ object OrderedRVD {
pkis.sortBy(_.min)(typ.pkOrd)
}

def coerce(typ: OrderedRVDType,
def coerce(
typ: OrderedRVDType,
rvd: RVD
): OrderedRVD = coerce(typ, rvd, None, None)

def coerce(
typ: OrderedRVDType,
rvd: RVD,
fastKeys: Option[RDD[RegionValue]],
hintPartitioner: Option[OrderedRVDPartitioner]
): OrderedRVD = coerce(typ, rvd.rdd, fastKeys, hintPartitioner)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue]
): OrderedRVD = coerce(typ, rdd, None, None)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
fastKeys: RDD[RegionValue]
): OrderedRVD = coerce(typ, rdd, Some(fastKeys), None)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
hintPartitioner: OrderedRVDPartitioner
): OrderedRVD = coerce(typ, rdd, None, Some(hintPartitioner))

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
fastKeys: RDD[RegionValue],
hintPartitioner: OrderedRVDPartitioner
): OrderedRVD = coerce(typ, rdd, Some(fastKeys), Some(hintPartitioner))

def coerce(
typ: OrderedRVDType,
// rdd: RDD[RegionValue[rowType]]
rdd: RDD[RegionValue],
// fastKeys: Option[RDD[RegionValue[kType]]]
fastKeys: Option[RDD[RegionValue]] = None,
hintPartitioner: Option[OrderedRVDPartitioner] = None): (CoercionMethod, OrderedRVD) = {
fastKeys: Option[RDD[RegionValue]],
hintPartitioner: Option[OrderedRVDPartitioner]
): OrderedRVD = {
val sc = rdd.sparkContext

if (rdd.partitions.isEmpty)
return (ORDERED_PARTITIONER, empty(sc, typ))
return empty(sc, typ)

// keys: RDD[RegionValue[kType]]
val keys = fastKeys.getOrElse(getKeys(typ, rdd))

val pkis = getPartitionKeyInfo(typ, keys)

if (pkis.isEmpty)
return (AS_IS, empty(sc, typ))
return empty(sc, typ)

val partitionsSorted = (pkis, pkis.tail).zipped.forall { case (p, pnext) =>
val r = typ.pkOrd.lteq(p.max, pnext.min)
Expand All @@ -487,29 +534,28 @@ object OrderedRVD {
(adjSortedness: @unchecked) match {
case OrderedRVPartitionInfo.KSORTED =>
info("Coerced sorted dataset")
(AS_IS, OrderedRVD(typ,
OrderedRVD(typ,
partitioner,
adjustedRDD))
adjustedRDD)

case OrderedRVPartitionInfo.TSORTED =>
info("Coerced almost-sorted dataset")
(LOCAL_SORT, OrderedRVD(typ,
OrderedRVD(typ,
partitioner,
adjustedRDD.mapPartitions { it =>
localKeySort(typ, it)
}))
})
}
} else {
info("Ordering unsorted dataset with network shuffle")
val orvd = hintPartitioner
hintPartitioner
.filter(_.numPartitions >= rdd.partitions.length)
.map(adjustBoundsAndShuffle(typ, _, rdd))
.getOrElse {
val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions)
val p = new OrderedRVDPartitioner(typ.partitionKey, typ.kType, ranges)
shuffle(typ, p, rdd)
}
(SHUFFLE, orvd)
val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions)
val p = new OrderedRVDPartitioner(typ.partitionKey, typ.kType, ranges)
shuffle(typ, p, rdd)
}
}
}

Expand Down Expand Up @@ -567,6 +613,12 @@ object OrderedRVD {
shuffle(typ, partitioner.enlargeToRange(Interval(min, max, true, true)), rdd)
}

def shuffle(
typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rvd: RVD
): OrderedRVD = shuffle(typ, partitioner, rvd.rdd)

def shuffle(typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rdd: RDD[RegionValue]): OrderedRVD = {
Expand Down Expand Up @@ -594,9 +646,6 @@ object OrderedRVD {
})
}

def shuffle(typ: OrderedRVDType, partitioner: OrderedRVDPartitioner, rvd: RVD): OrderedRVD =
shuffle(typ, partitioner, rvd.rdd)

def rangesAndAdjustments(typ: OrderedRVDType,
sortedKeyInfo: Array[OrderedRVPartitionInfo],
sortedness: Int): (IndexedSeq[Array[Adjustment[RegionValue]]], UnsafeIndexedSeq, Int) = {
Expand Down Expand Up @@ -662,6 +711,12 @@ object OrderedRVD {
(adjustmentsBuffer, rangeBounds, adjSortedness)
}

def apply(
typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rvd: RVD
): OrderedRVD = apply(typ, partitioner, rvd.rdd)

def apply(typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rdd: RDD[RegionValue]): OrderedRVD = {
Expand Down Expand Up @@ -703,8 +758,14 @@ object OrderedRVD {
})
}

def apply(typ: OrderedRVDType, partitioner: OrderedRVDPartitioner, rvd: RVD): OrderedRVD = {
assert(typ.rowType == rvd.rowType)
apply(typ, partitioner, rvd.rdd)
def union(rvds: Array[OrderedRVD]): OrderedRVD = {
require(rvds.length > 1)
val first = rvds(0)
val sc = first.sparkContext
OrderedRVD.coerce(
first.typ,
sc.union(rvds.map(_.rdd)),
None,
None)
}
}
Loading

0 comments on commit 8b39e6a

Please sign in to comment.