Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate some but not all uses of RVD.rdd #3186

Merged
merged 26 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions src/main/scala/is/hail/expr/Relational.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ case class MatrixValue(
colValues: IndexedSeq[Annotation],
rvd: OrderedRVD) {

assert(rvd.typ == typ.orvdType)

def sparkContext: SparkContext = rvd.sparkContext

def nPartitions: Int = rvd.partitions.length
Expand Down Expand Up @@ -252,42 +254,43 @@ case class MatrixRead(
} else {
val entriesRVD = spec.entriesComponent.read(hc, path)
val entriesRowType = entriesRVD.rowType
OrderedRVD(typ.orvdType,
rowsRVD.partitioner,
rowsRVD.rdd.zipPartitions(entriesRVD.rdd) { case (it1, it2) =>
val rvb = new RegionValueBuilder()

new Iterator[RegionValue] {
def hasNext: Boolean = {
val hn = it1.hasNext
assert(hn == it2.hasNext)
hn
}
rowsRVD.zipPartitionsPreservesPartitioning(
typ.orvdType,
entriesRVD
) { case (it1, it2) =>
val rvb = new RegionValueBuilder()

new Iterator[RegionValue] {
def hasNext: Boolean = {
val hn = it1.hasNext
assert(hn == it2.hasNext)
hn
}

def next(): RegionValue = {
val rv1 = it1.next()
val rv2 = it2.next()
val region = rv2.region
rvb.set(region)
rvb.start(fullRowType)
rvb.startStruct()
var i = 0
while (i < localEntriesIndex) {
rvb.addField(fullRowType, rv1, i)
i += 1
}
rvb.addField(entriesRowType, rv2, 0)
def next(): RegionValue = {
val rv1 = it1.next()
val rv2 = it2.next()
val region = rv2.region
rvb.set(region)
rvb.start(fullRowType)
rvb.startStruct()
var i = 0
while (i < localEntriesIndex) {
rvb.addField(fullRowType, rv1, i)
i += 1
while (i < fullRowType.size) {
rvb.addField(fullRowType, rv1, i - 1)
i += 1
}
rvb.endStruct()
rv2.set(region, rvb.end())
rv2
}
rvb.addField(entriesRowType, rv2, 0)
i += 1
while (i < fullRowType.size) {
rvb.addField(fullRowType, rv1, i - 1)
i += 1
}
rvb.endStruct()
rv2.set(region, rvb.end())
rv2
}
})
}
}
}
}

Expand Down Expand Up @@ -450,10 +453,8 @@ case class MapEntries(child: MatrixIR, newEntries: IR) extends MatrixIR {
}

case class TableValue(typ: TableType, globals: BroadcastValue, rvd: RVD) {
def rdd: RDD[Row] = {
val localRowType = typ.rowType
rvd.rdd.map { rv => new UnsafeRow(localRowType, rv.region.copy(), rv.offset) }
}
def rdd: RDD[Row] =
rvd.toRows

def filter(p: (RegionValue, RegionValue) => Boolean): TableValue = {
val globalType = typ.globalType
Expand Down Expand Up @@ -642,7 +643,7 @@ case class TableJoin(left: TableIR, right: TableIR, joinType: String) extends Ta
val leftORVD = leftTV.rvd match {
case ordered: OrderedRVD => ordered
case unordered =>
OrderedRVD(
OrderedRVD.coerce(
new OrderedRVDType(left.typ.key.toArray, left.typ.key.toArray, leftRowType),
unordered.rdd,
None,
Expand All @@ -656,7 +657,7 @@ case class TableJoin(left: TableIR, right: TableIR, joinType: String) extends Ta
if (joinType == "left" || joinType == "inner")
unordered.constrainToOrderedPartitioner(ordType, leftORVD.partitioner)
else
OrderedRVD(ordType, unordered.rdd, None, Some(leftORVD.partitioner))
OrderedRVD.coerce(ordType, unordered.rdd, None, Some(leftORVD.partitioner))
}
val joinedRVD = leftORVD.orderedJoin(
rightORVD,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/LoadMatrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ object LoadMatrix {
val (partitioner, keepPartitions) = makePartitionerFromCounts(partitionCounts, matrixType.orvdType.pkType)
OrderedRVD(matrixType.orvdType, partitioner, rdd.subsetPartitions(keepPartitions))
} else
OrderedRVD(matrixType.orvdType, rdd, None, None)
OrderedRVD.coerce(matrixType.orvdType, rdd, None, None)

new MatrixTable(hc,
matrixType,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/bgen/LoadBgen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ object LoadBgen {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleIds.map(x => Annotation(x)),
OrderedRVD(matrixType.orvdType, rdd2, Some(fastKeys), None))
OrderedRVD.coerce(matrixType.orvdType, rdd2, Some(fastKeys), None))
}

def index(hConf: org.apache.hadoop.conf.Configuration, file: String) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/plink/LoadPlink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ object LoadPlink {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleAnnotations,
OrderedRVD(matrixType.orvdType, rdd2, Some(fastKeys), None))
OrderedRVD.coerce(matrixType.orvdType, rdd2, Some(fastKeys), None))
}

def apply(hc: HailContext, bedPath: String, bimPath: String, famPath: String, ffConfig: FamFileConfig,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/vcf/LoadGDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,6 @@ object LoadGDB {
new MatrixTable(hc, matrixType,
BroadcastValue(Annotation.empty, matrixType.globalType, sc),
sampleIds.map(x => Annotation(x)),
OrderedRVD(matrixType.orvdType, hc.sc.parallelize(records), None, None))
OrderedRVD.coerce(matrixType.orvdType, hc.sc.parallelize(records), None, None))
}
}
2 changes: 1 addition & 1 deletion src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ object LoadVCF {
// nothing after the key
val justVariants = parseLines(() => ())((c, l, rvb) => ())(lines, kType, rg, contigRecoding)

val rdd = OrderedRVD(
val rdd = OrderedRVD.coerce(
matrixType.orvdType,
parseLines { () =>
new ParseLineContext(genotypeSignature, new BufferedLineIterator(headerLinesBc.value.iterator.buffered))
Expand Down
143 changes: 102 additions & 41 deletions src/main/scala/is/hail/rvd/OrderedRVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class OrderedRVD(
self =>
def rowType: TStruct = typ.rowType

def updateType(newTyp: OrderedRVDType): OrderedRVD =
OrderedRVD(newTyp, partitioner, rdd)

def mapPreservesPartitioning(newTyp: OrderedRVDType)(f: (RegionValue) => RegionValue): OrderedRVD =
OrderedRVD(newTyp,
partitioner,
Expand All @@ -38,13 +41,6 @@ class OrderedRVD(
partitioner,
rdd.mapPartitions(f))

def zipPartitionsPreservesPartitioning[T](newTyp: OrderedRVDType, rdd2: RDD[T])(f: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue])(implicit tct: ClassTag[T]): OrderedRVD =
OrderedRVD(newTyp,
partitioner,
rdd.zipPartitions(rdd2) { case (it, it2) =>
f(it, it2)
})

override def filter(p: (RegionValue) => Boolean): OrderedRVD =
OrderedRVD(typ,
partitioner,
Expand Down Expand Up @@ -354,28 +350,41 @@ class OrderedRVD(
spec.write(sparkContext.hadoopConfiguration, path)
partitionCounts
}
}

object OrderedRVD {
type CoercionMethod = Int
def zipPartitionsPreservesPartitioning[T: ClassTag](
newTyp: OrderedRVDType,
that: RDD[T]
)(zipper: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue]
): OrderedRVD =
OrderedRVD(
newTyp,
partitioner,
this.rdd.zipPartitions(that, preservesPartitioning = true)(zipper))

def zipPartitionsPreservesPartitioning(
newTyp: OrderedRVDType,
that: RVD
)(zipper: (Iterator[RegionValue], Iterator[RegionValue]) => Iterator[RegionValue]
): OrderedRVD =
OrderedRVD(
newTyp,
partitioner,
this.rdd.zipPartitions(that.rdd, preservesPartitioning = true)(zipper))

final val ORDERED_PARTITIONER: CoercionMethod = 0
final val AS_IS: CoercionMethod = 1
final val LOCAL_SORT: CoercionMethod = 2
final val SHUFFLE: CoercionMethod = 3
def writeRowsSplit(
path: String,
t: MatrixType,
codecSpec: CodecSpec
): Array[Long] = rdd.writeRowsSplit(path, t, codecSpec, partitioner)
}

object OrderedRVD {
def empty(sc: SparkContext, typ: OrderedRVDType): OrderedRVD = {
OrderedRVD(typ,
OrderedRVDPartitioner.empty(typ),
sc.emptyRDD[RegionValue])
}

def apply(typ: OrderedRVDType,
rdd: RDD[RegionValue], fastKeys: Option[RDD[RegionValue]], hintPartitioner: Option[OrderedRVDPartitioner]): OrderedRVD = {
val (_, orderedRVD) = coerce(typ, rdd, fastKeys, hintPartitioner)
orderedRVD
}

/**
* Precondition: the iterator it is PK-sorted. We lazily K-sort each block
* of PK-equivalent elements.
Expand Down Expand Up @@ -443,24 +452,62 @@ object OrderedRVD {
pkis.sortBy(_.min)(typ.pkOrd)
}

def coerce(typ: OrderedRVDType,
def coerce(
typ: OrderedRVDType,
rvd: RVD
): OrderedRVD = coerce(typ, rvd, None, None)

def coerce(
typ: OrderedRVDType,
rvd: RVD,
fastKeys: Option[RDD[RegionValue]],
hintPartitioner: Option[OrderedRVDPartitioner]
): OrderedRVD = coerce(typ, rvd.rdd, fastKeys, hintPartitioner)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue]
): OrderedRVD = coerce(typ, rdd, None, None)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
fastKeys: RDD[RegionValue]
): OrderedRVD = coerce(typ, rdd, Some(fastKeys), None)

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
hintPartitioner: OrderedRVDPartitioner
): OrderedRVD = coerce(typ, rdd, None, Some(hintPartitioner))

def coerce(
typ: OrderedRVDType,
rdd: RDD[RegionValue],
fastKeys: RDD[RegionValue],
hintPartitioner: OrderedRVDPartitioner
): OrderedRVD = coerce(typ, rdd, Some(fastKeys), Some(hintPartitioner))

def coerce(
typ: OrderedRVDType,
// rdd: RDD[RegionValue[rowType]]
rdd: RDD[RegionValue],
// fastKeys: Option[RDD[RegionValue[kType]]]
fastKeys: Option[RDD[RegionValue]] = None,
hintPartitioner: Option[OrderedRVDPartitioner] = None): (CoercionMethod, OrderedRVD) = {
fastKeys: Option[RDD[RegionValue]],
hintPartitioner: Option[OrderedRVDPartitioner]
): OrderedRVD = {
val sc = rdd.sparkContext

if (rdd.partitions.isEmpty)
return (ORDERED_PARTITIONER, empty(sc, typ))
return empty(sc, typ)

// keys: RDD[RegionValue[kType]]
val keys = fastKeys.getOrElse(getKeys(typ, rdd))

val pkis = getPartitionKeyInfo(typ, keys)

if (pkis.isEmpty)
return (AS_IS, empty(sc, typ))
return empty(sc, typ)

val partitionsSorted = (pkis, pkis.tail).zipped.forall { case (p, pnext) =>
val r = typ.pkOrd.lteq(p.max, pnext.min)
Expand All @@ -482,29 +529,28 @@ object OrderedRVD {
(adjSortedness: @unchecked) match {
case OrderedRVPartitionInfo.KSORTED =>
info("Coerced sorted dataset")
(AS_IS, OrderedRVD(typ,
OrderedRVD(typ,
partitioner,
adjustedRDD))
adjustedRDD)

case OrderedRVPartitionInfo.TSORTED =>
info("Coerced almost-sorted dataset")
(LOCAL_SORT, OrderedRVD(typ,
OrderedRVD(typ,
partitioner,
adjustedRDD.mapPartitions { it =>
localKeySort(typ, it)
}))
})
}
} else {
info("Ordering unsorted dataset with network shuffle")
val orvd = hintPartitioner
hintPartitioner
.filter(_.numPartitions >= rdd.partitions.length)
.map(adjustBoundsAndShuffle(typ, _, rdd))
.getOrElse {
val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions)
val p = new OrderedRVDPartitioner(typ.partitionKey, typ.kType, ranges)
shuffle(typ, p, rdd)
}
(SHUFFLE, orvd)
val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions)
val p = new OrderedRVDPartitioner(typ.partitionKey, typ.kType, ranges)
shuffle(typ, p, rdd)
}
}
}

Expand Down Expand Up @@ -562,6 +608,12 @@ object OrderedRVD {
shuffle(typ, partitioner.enlargeToRange(Interval(min, max, true, true)), rdd)
}

def shuffle(
typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rvd: RVD
): OrderedRVD = shuffle(typ, partitioner, rvd.rdd)

def shuffle(typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rdd: RDD[RegionValue]): OrderedRVD = {
Expand All @@ -587,9 +639,6 @@ object OrderedRVD {
})
}

def shuffle(typ: OrderedRVDType, partitioner: OrderedRVDPartitioner, rvd: RVD): OrderedRVD =
shuffle(typ, partitioner, rvd.rdd)

def rangesAndAdjustments(typ: OrderedRVDType,
sortedKeyInfo: Array[OrderedRVPartitionInfo],
sortedness: Int): (IndexedSeq[Array[Adjustment[RegionValue]]], UnsafeIndexedSeq, Int) = {
Expand Down Expand Up @@ -655,6 +704,12 @@ object OrderedRVD {
(adjustmentsBuffer, rangeBounds, adjSortedness)
}

def apply(
typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rvd: RVD
): OrderedRVD = apply(typ, partitioner, rvd.rdd)

def apply(typ: OrderedRVDType,
partitioner: OrderedRVDPartitioner,
rdd: RDD[RegionValue]): OrderedRVD = {
Expand Down Expand Up @@ -695,8 +750,14 @@ object OrderedRVD {
})
}

def apply(typ: OrderedRVDType, partitioner: OrderedRVDPartitioner, rvd: RVD): OrderedRVD = {
assert(typ.rowType == rvd.rowType)
apply(typ, partitioner, rvd.rdd)
def union(rvds: Array[OrderedRVD]): OrderedRVD = {
require(rvds.length > 1)
val first = rvds(0)
val sc = first.sparkContext
OrderedRVD.coerce(
first.typ,
sc.union(rvds.map(_.rdd)),
None,
None)
}
}
Loading