From 8c84d0ff837a7c1cb89a3434b9728bb78faf12a3 Mon Sep 17 00:00:00 2001 From: Cotton Seed Date: Mon, 22 Aug 2016 17:41:53 -0400 Subject: [PATCH] Cleaned up OrderedRDD.apply. --- .../org/apache/spark/rdd/OrderedRDD.scala | 169 +++++++++--------- .../scala/org/broadinstitute/hail/Utils.scala | 10 +- 2 files changed, 93 insertions(+), 86 deletions(-) diff --git a/src/main/scala/org/apache/spark/rdd/OrderedRDD.scala b/src/main/scala/org/apache/spark/rdd/OrderedRDD.scala index cbec1e249b2..1ac39ae8609 100644 --- a/src/main/scala/org/apache/spark/rdd/OrderedRDD.scala +++ b/src/main/scala/org/apache/spark/rdd/OrderedRDD.scala @@ -8,105 +8,112 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.util.hashing._ +case class PartitionKeyInfo[T]( + partIndex: Int, + sortedness: Int, + min: T, + max: T) + +object PartitionKeyInfo { + final val UNSORTED = 0 + final val TSORTED = 1 + final val KSORTED = 2 + + def apply[T, K](partIndex: Int, projectKey: (K) => T, it: Iterator[K])(implicit tOrd: Ordering[T], kOrd: Ordering[K]): PartitionKeyInfo[T] = { + assert(it.hasNext) + + val k0 = it.next() + val t0 = projectKey(k0) + + var minT = t0 + var maxT = t0 + var sortedness = KSORTED + var prevK = k0 + var prevT = t0 + + while (it.hasNext) { + val k = it.next() + val t = projectKey(k) + + if (tOrd.lt(prevT, t)) + sortedness = UNSORTED + else if (kOrd.lt(prevK, k)) + sortedness = sortedness.min(TSORTED) + + if (tOrd.lt(t, minT)) + minT = t + if (tOrd.gt(t, maxT)) + maxT = t + + prevK = k + prevT = t + } + + PartitionKeyInfo(partIndex, sortedness, minT, maxT) + } +} + object OrderedRDD { def empty[T, K, V](sc: SparkContext, projectKey: (K) => T)(implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T], kct: ClassTag[K]): OrderedRDD[T, K, V] = new OrderedRDD[T, K, V](sc.emptyRDD[(K, V)], OrderedPartitioner.empty[T, K](projectKey)) - def apply[T, K, V](rdd: RDD[(K, V)], projectKey: (K) => T, reducedRDD: Option[RDD[K]] = None) + def apply[T, K, V](rdd: RDD[(K, V)], projectKey: (K) => T, fastKeys: Option[RDD[K]] = None) (implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T], kct: ClassTag[K]): OrderedRDD[T, K, V] = { - def fromPartitionSummaries(arr: Array[(Int, Boolean, T, T)]): Option[OrderedRDD[T, K, V]] = { - val allKSorted = arr.forall(_._2) - - val minMax = arr.map(x => (x._3, x._4)) - val sortedBetweenPartitions = minMax.tail.map(_._1).zip(minMax.map(_._2)) - .forall { case (nextMin, lastMax) => tOrd.gt(nextMin, lastMax) } - - if (sortedBetweenPartitions) { - val ab = mutable.ArrayBuilder.make[T]() - minMax.take(minMax.length - 1).foreach { case (_, max) => ab += max } - - val partitioner = OrderedPartitioner[T, K](ab.result(), projectKey) - assert(partitioner.numPartitions == rdd.partitions.length) - - if (allKSorted) { + if (rdd.partitions.isEmpty) + return empty(rdd.sparkContext, projectKey) + + val keys = fastKeys.getOrElse(rdd.map(_._1)) + + val keyInfoOption = anyFailAllFail[Array, PartitionKeyInfo[T]]( + keys.mapPartitionsWithIndex { case (i, it) => + Iterator(if (it.hasNext) + Some(PartitionKeyInfo.apply(i, projectKey, it)) + else + None) + }.collect()) + + val partitionsSorted = + keyInfoOption.exists(keyInfo => + keyInfo.tail.zip(keyInfo).forall { case (pi1, pi2) => + tOrd.lt(pi1.max, pi2.min) + }) + + if (partitionsSorted) { + val keyInfo = keyInfoOption.get + val partitioner = OrderedPartitioner[T, K](keyInfo.init.map(pi => pi.max), projectKey) + val sortedness = keyInfo.map(_.sortedness).min + (sortedness: @unchecked) match { + case PartitionKeyInfo.KSORTED => + assert(sortedness == PartitionKeyInfo.KSORTED) info("Coerced sorted dataset") - Some(new OrderedRDD[T, K, V](rdd, partitioner)) - } else { - val sortedRDD = rdd.mapPartitionsWithIndex { case (i, it) => - if (arr(i)._2) - it - else it.localKeySort[T](projectKey) - } - info("Coerced almost-sorted dataset") - Some(new OrderedRDD(sortedRDD, partitioner)) - } - } - else None - } + new OrderedRDD(rdd, partitioner) - def verifySortedness(i: Int, it: Iterator[K]): Iterator[Option[(Int, Boolean, T, T)]] = { - if (it.isEmpty) - Iterator(None) - else { - var sortedT = true - var sortedK = true - var continue = true - val firstK = it.next() - var lastK = firstK - var lastT = projectKey(firstK) - - while (it.hasNext && continue) { - val k = it.next() - val t = projectKey(k) - if (tOrd.lt(t, lastT)) { - sortedT = false - continue = false - } else if (kOrd.lt(k, lastK)) - sortedK = false - - lastK = k - lastT = t - } - if (!sortedT) - Iterator(None) - else Iterator(Some((i, sortedK, projectKey(firstK), projectKey(lastK)))) + case PartitionKeyInfo.TSORTED => + info("Coerced almost-sorted dataset") + new OrderedRDD(rdd.mapPartitions { it => + it.localKeySort(projectKey) + }, partitioner) + + case PartitionKeyInfo.UNSORTED => + info("Coerced unsorted dataset") + new OrderedRDD(rdd.mapPartitions { it => + it.toArray.sortBy(_._1).iterator + }, partitioner) } - } - - def fromShuffle(): OrderedRDD[T, K, V] = { + } else { info("Ordering unsorted dataset with network shuffle") - val ranges: Array[T] = calculateKeyRanges[T](reducedRDD.map(_.map(k => projectKey(k))) - .getOrElse(rdd.map { case (k, _) => projectKey(k) })) + val ranges: Array[T] = calculateKeyRanges[T](keys.map(projectKey)) val partitioner = OrderedPartitioner[T, K](ranges, projectKey) new OrderedRDD[T, K, V](new ShuffledRDD[K, V, V](rdd, partitioner).setKeyOrdering(kOrd), partitioner) } - - rdd match { - case _: OrderedRDD[T, K, V] => - rdd.asInstanceOf[OrderedRDD[T, K, V]] - case _: EmptyRDD[(K, V)] => new OrderedRDD(rdd, OrderedPartitioner(Array.empty[T], projectKey)) - case _ => - rdd.partitioner match { - case Some(p: OrderedPartitioner[T, K]) => - new OrderedRDD(rdd, p) - case _ => - val result = reducedRDD.getOrElse(rdd.map(_._1)) - .mapPartitionsWithIndex(verifySortedness).collect() - - anyFailAllFail[Array, (Int, Boolean, T, T)](result) - .map(_.sortBy(_._1)) - .flatMap(fromPartitionSummaries) - .getOrElse(fromShuffle()) - } - } } /** * Copied from: * org.apache.spark.RangePartitioner - * version 1.5.0 + * version 1.5.0 */ def calculateKeyRanges[T](rdd: RDD[T])(implicit ord: Ordering[T], tct: ClassTag[T]): Array[T] = { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. @@ -151,6 +158,8 @@ object OrderedRDD { class OrderedRDD[T, K, V](rdd: RDD[(K, V)], val orderedPartitioner: OrderedPartitioner[T, K]) (implicit tOrd: Ordering[T], kOrd: Ordering[K], tct: ClassTag[T], kct: ClassTag[K]) extends RDD[(K, V)](rdd) { + assert((orderedPartitioner.rangeBounds.isEmpty && rdd.partitions.isEmpty) + || orderedPartitioner.rangeBounds.length == rdd.partitions.length - 1) override val partitioner: Option[Partitioner] = Some(orderedPartitioner) diff --git a/src/main/scala/org/broadinstitute/hail/Utils.scala b/src/main/scala/org/broadinstitute/hail/Utils.scala index 7ec6b6174b6..17ea8584ea7 100644 --- a/src/main/scala/org/broadinstitute/hail/Utils.scala +++ b/src/main/scala/org/broadinstitute/hail/Utils.scala @@ -639,17 +639,15 @@ class RichPairIterator[K, V](val it: Iterator[(K, V)]) extends AnyVal { def next(): T = { val (k, v) = it.next() - while (bother.hasNext && bother.head._1 < k) { - val n = bother.next() - } + while (bother.hasNext && bother.head._1 < k) + bother.next() if (bother.hasNext && bother.head._1 == k) { val (k2, v2) = bother.next() /* implement distinct on the right */ - while (bother.hasNext && bother.head._1 == k) { - val n = bother.next() - } + while (bother.hasNext && bother.head._1 == k) + bother.next() (k, (v, Some(v2))) } else