Skip to content

Commit

Permalink
Merge pull request hail-is#9 from cseed/partition
Browse files Browse the repository at this point in the history
Cleaned up OrderedRDD.apply.
  • Loading branch information
tpoterba authored Aug 22, 2016
2 parents efbc826 + 8c84d0f commit 5f71a9e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 86 deletions.
169 changes: 89 additions & 80 deletions src/main/scala/org/apache/spark/rdd/OrderedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,105 +8,112 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.hashing._

case class PartitionKeyInfo[T](
partIndex: Int,
sortedness: Int,
min: T,
max: T)

object PartitionKeyInfo {
final val UNSORTED = 0
final val TSORTED = 1
final val KSORTED = 2

def apply[T, K](partIndex: Int, projectKey: (K) => T, it: Iterator[K])(implicit tOrd: Ordering[T], kOrd: Ordering[K]): PartitionKeyInfo[T] = {
assert(it.hasNext)

val k0 = it.next()
val t0 = projectKey(k0)

var minT = t0
var maxT = t0
var sortedness = KSORTED
var prevK = k0
var prevT = t0

while (it.hasNext) {
val k = it.next()
val t = projectKey(k)

if (tOrd.lt(prevT, t))
sortedness = UNSORTED
else if (kOrd.lt(prevK, k))
sortedness = sortedness.min(TSORTED)

if (tOrd.lt(t, minT))
minT = t
if (tOrd.gt(t, maxT))
maxT = t

prevK = k
prevT = t
}

PartitionKeyInfo(partIndex, sortedness, minT, maxT)
}
}

object OrderedRDD {

def empty[T, K, V](sc: SparkContext, projectKey: (K) => T)(implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T],
kct: ClassTag[K]): OrderedRDD[T, K, V] = new OrderedRDD[T, K, V](sc.emptyRDD[(K, V)], OrderedPartitioner.empty[T, K](projectKey))

def apply[T, K, V](rdd: RDD[(K, V)], projectKey: (K) => T, reducedRDD: Option[RDD[K]] = None)
def apply[T, K, V](rdd: RDD[(K, V)], projectKey: (K) => T, fastKeys: Option[RDD[K]] = None)
(implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T], kct: ClassTag[K]): OrderedRDD[T, K, V] = {

def fromPartitionSummaries(arr: Array[(Int, Boolean, T, T)]): Option[OrderedRDD[T, K, V]] = {
val allKSorted = arr.forall(_._2)

val minMax = arr.map(x => (x._3, x._4))
val sortedBetweenPartitions = minMax.tail.map(_._1).zip(minMax.map(_._2))
.forall { case (nextMin, lastMax) => tOrd.gt(nextMin, lastMax) }

if (sortedBetweenPartitions) {
val ab = mutable.ArrayBuilder.make[T]()
minMax.take(minMax.length - 1).foreach { case (_, max) => ab += max }

val partitioner = OrderedPartitioner[T, K](ab.result(), projectKey)
assert(partitioner.numPartitions == rdd.partitions.length)

if (allKSorted) {
if (rdd.partitions.isEmpty)
return empty(rdd.sparkContext, projectKey)

val keys = fastKeys.getOrElse(rdd.map(_._1))

val keyInfoOption = anyFailAllFail[Array, PartitionKeyInfo[T]](
keys.mapPartitionsWithIndex { case (i, it) =>
Iterator(if (it.hasNext)
Some(PartitionKeyInfo.apply(i, projectKey, it))
else
None)
}.collect())

val partitionsSorted =
keyInfoOption.exists(keyInfo =>
keyInfo.tail.zip(keyInfo).forall { case (pi1, pi2) =>
tOrd.lt(pi1.max, pi2.min)
})

if (partitionsSorted) {
val keyInfo = keyInfoOption.get
val partitioner = OrderedPartitioner[T, K](keyInfo.init.map(pi => pi.max), projectKey)
val sortedness = keyInfo.map(_.sortedness).min
(sortedness: @unchecked) match {
case PartitionKeyInfo.KSORTED =>
assert(sortedness == PartitionKeyInfo.KSORTED)
info("Coerced sorted dataset")
Some(new OrderedRDD[T, K, V](rdd, partitioner))
} else {
val sortedRDD = rdd.mapPartitionsWithIndex { case (i, it) =>
if (arr(i)._2)
it
else it.localKeySort[T](projectKey)
}
info("Coerced almost-sorted dataset")
Some(new OrderedRDD(sortedRDD, partitioner))
}
}
else None
}
new OrderedRDD(rdd, partitioner)

def verifySortedness(i: Int, it: Iterator[K]): Iterator[Option[(Int, Boolean, T, T)]] = {
if (it.isEmpty)
Iterator(None)
else {
var sortedT = true
var sortedK = true
var continue = true
val firstK = it.next()
var lastK = firstK
var lastT = projectKey(firstK)

while (it.hasNext && continue) {
val k = it.next()
val t = projectKey(k)
if (tOrd.lt(t, lastT)) {
sortedT = false
continue = false
} else if (kOrd.lt(k, lastK))
sortedK = false

lastK = k
lastT = t
}
if (!sortedT)
Iterator(None)
else Iterator(Some((i, sortedK, projectKey(firstK), projectKey(lastK))))
case PartitionKeyInfo.TSORTED =>
info("Coerced almost-sorted dataset")
new OrderedRDD(rdd.mapPartitions { it =>
it.localKeySort(projectKey)
}, partitioner)

case PartitionKeyInfo.UNSORTED =>
info("Coerced unsorted dataset")
new OrderedRDD(rdd.mapPartitions { it =>
it.toArray.sortBy(_._1).iterator
}, partitioner)
}
}

def fromShuffle(): OrderedRDD[T, K, V] = {
} else {
info("Ordering unsorted dataset with network shuffle")
val ranges: Array[T] = calculateKeyRanges[T](reducedRDD.map(_.map(k => projectKey(k)))
.getOrElse(rdd.map { case (k, _) => projectKey(k) }))
val ranges: Array[T] = calculateKeyRanges[T](keys.map(projectKey))
val partitioner = OrderedPartitioner[T, K](ranges, projectKey)
new OrderedRDD[T, K, V](new ShuffledRDD[K, V, V](rdd, partitioner).setKeyOrdering(kOrd), partitioner)
}

rdd match {
case _: OrderedRDD[T, K, V] =>
rdd.asInstanceOf[OrderedRDD[T, K, V]]
case _: EmptyRDD[(K, V)] => new OrderedRDD(rdd, OrderedPartitioner(Array.empty[T], projectKey))
case _ =>
rdd.partitioner match {
case Some(p: OrderedPartitioner[T, K]) =>
new OrderedRDD(rdd, p)
case _ =>
val result = reducedRDD.getOrElse(rdd.map(_._1))
.mapPartitionsWithIndex(verifySortedness).collect()

anyFailAllFail[Array, (Int, Boolean, T, T)](result)
.map(_.sortBy(_._1))
.flatMap(fromPartitionSummaries)
.getOrElse(fromShuffle())
}
}
}

/**
* Copied from:
* org.apache.spark.RangePartitioner
* version 1.5.0
* version 1.5.0
*/
def calculateKeyRanges[T](rdd: RDD[T])(implicit ord: Ordering[T], tct: ClassTag[T]): Array[T] = {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
Expand Down Expand Up @@ -151,6 +158,8 @@ object OrderedRDD {
class OrderedRDD[T, K, V](rdd: RDD[(K, V)],
val orderedPartitioner: OrderedPartitioner[T, K])
(implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T], kct: ClassTag[K]) extends RDD[(K, V)](rdd) {
assert((orderedPartitioner.rangeBounds.isEmpty && rdd.partitions.isEmpty)
|| orderedPartitioner.rangeBounds.length == rdd.partitions.length - 1)

override val partitioner: Option[Partitioner] = Some(orderedPartitioner)

Expand Down
10 changes: 4 additions & 6 deletions src/main/scala/org/broadinstitute/hail/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -639,17 +639,15 @@ class RichPairIterator[K, V](val it: Iterator[(K, V)]) extends AnyVal {
def next(): T = {
val (k, v) = it.next()

while (bother.hasNext && bother.head._1 < k) {
val n = bother.next()
}
while (bother.hasNext && bother.head._1 < k)
bother.next()

if (bother.hasNext && bother.head._1 == k) {
val (k2, v2) = bother.next()

/* implement distinct on the right */
while (bother.hasNext && bother.head._1 == k) {
val n = bother.next()
}
while (bother.hasNext && bother.head._1 == k)
bother.next()

(k, (v, Some(v2)))
} else
Expand Down

0 comments on commit 5f71a9e

Please sign in to comment.