Skip to content

Commit

Permalink
Merge pull request #4 from tpoterba/improve-random-generation
Browse files Browse the repository at this point in the history
Fixed the test, finally
  • Loading branch information
danking authored Jul 21, 2017
2 parents f70738d + 1e02edd commit c2801ec
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
33 changes: 12 additions & 21 deletions src/main/scala/is/hail/sparkextras/OrderedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ object OrderedRDD {
final val ORDERED_PARTITIONER: CoercionMethod = 0
final val AS_IS: CoercionMethod = 1
final val LOCAL_SORT: CoercionMethod = 2
final val ARRAY_SORT: CoercionMethod = 3
final val SHUFFLE: CoercionMethod = 4
final val SHUFFLE: CoercionMethod = 3

def empty[PK, K, V](sc: SparkContext)(implicit kOk: OrderedKey[PK, K], vct: ClassTag[V]): OrderedRDD[PK, K, V] =
new OrderedRDD[PK, K, V](sc.emptyRDD[(K, V)], OrderedPartitioner.empty)
Expand Down Expand Up @@ -64,7 +63,7 @@ object OrderedRDD {
return (ORDERED_PARTITIONER, ordd.asInstanceOf[OrderedRDD[PK, K, V]])
case _ =>
}

val keys = fastKeys.getOrElse(rdd.map(_._1))

val keyInfo = keys.mapPartitionsWithIndex { case (i, it) =>
Expand All @@ -87,8 +86,8 @@ object OrderedRDD {
r
}

if (partitionsSorted) {
val sortedness = sortedKeyInfo.map(_.sortedness).min
val sortedness = sortedKeyInfo.map(_.sortedness).min
if (partitionsSorted && sortedness >= PartitionKeyInfo.TSORTED) {
val (adjustedPartitions, rangeBounds, adjSortedness) = rangesAndAdjustments[PK, K, V](sortedKeyInfo, sortedness)
val partitioner = OrderedPartitioner[PK, K](rangeBounds, adjustedPartitions.length)
val reorderedPartitionsRDD = rdd.reorderPartitions(sortedKeyInfo.map(_.partIndex))
Expand All @@ -103,12 +102,6 @@ object OrderedRDD {
(LOCAL_SORT, OrderedRDD(adjustedRDD.mapPartitions { it =>
localKeySort(it)
}, partitioner))

case PartitionKeyInfo.UNSORTED =>
info("Coerced unsorted dataset")
(ARRAY_SORT, OrderedRDD(adjustedRDD.mapPartitions { it =>
it.toArray.sortBy(_._1).iterator
}, partitioner))
}
} else {
info("Ordering unsorted dataset with network shuffle")
Expand Down Expand Up @@ -160,19 +153,17 @@ object OrderedRDD {
}

val adjustments = indicesBuilder.result().zipWithIndex.map { case (partitionIndex, index) =>
assert(sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED)
val f: (Iterator[(K, V)]) => Iterator[(K, V)] =
// In the first partition, drop elements that should go in the last if necessary
if (index == 0)
if (adjustmentsBuffer.nonEmpty && min == sortedKeyInfo(adjustmentsBuffer.last.head.index).max)
if (sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) // PK sorted, so can use dropWhile
_.dropWhile(kv => kOk.project(kv._1) == min)
else
_.filter(kv => kOk.project(kv._1) != min)
_.dropWhile(kv => kOk.project(kv._1) == min)
else
identity
else if (sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) // PK sorted, so can use takeWhile
_.takeWhile(kv => kOk.project(kv._1) == max)
else
_.filter(kv => kOk.project(kv._1) == max)
// In every subsequent partition, only take elements that are the max of the last
_.takeWhile(kv => kOk.project(kv._1) == max)
Adjustment(partitionIndex, f)
}

Expand All @@ -190,12 +181,12 @@ object OrderedRDD {
def apply[PK, K, V](rdd: RDD[(K, V)],
orderedPartitioner: OrderedPartitioner[PK, K])
(implicit kOk: OrderedKey[PK, K], vct: ClassTag[V]): OrderedRDD[PK, K, V] = {
assert(rdd.partitions.length == orderedPartitioner.numPartitions, s"${rdd.partitions.length} != ${orderedPartitioner.numPartitions}")
assert(rdd.partitions.length == orderedPartitioner.numPartitions, s"${ rdd.partitions.length } != ${ orderedPartitioner.numPartitions }")

import kOk._

import Ordering.Implicits._

val rangeBoundsBc = rdd.sparkContext.broadcast(orderedPartitioner.rangeBounds)
new OrderedRDD(
rdd.mapPartitionsWithIndex { case (i, it) =>
Expand All @@ -217,7 +208,7 @@ object OrderedRDD {
if (first)
first = false
else
assert(prevK <= r._1, s"$prevK is greater than ${r._1}")
assert(prevK <= r._1, s"$prevK is greater than ${ r._1 }")

prevK = r._1
r
Expand Down
12 changes: 9 additions & 3 deletions src/test/scala/is/hail/utils/OrderedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class OrderedRDDSuite extends SparkSuite {
val locusSorted = for ((n, v) <- g;
locusSorted <- Gen.const(v.sortBy(_._1.locus))) yield sc.parallelize(locusSorted, n)

val scrambledInPartition = locusSorted.map(_.mapPartitions { it => Gen.shuffle(it.toIndexedSeq).sample().iterator })
val scrambledInPartition = locusSorted.map(_.mapPartitions { it => it.toArray.reverseIterator })

val sorted = for ((n, v) <- g;
sorted <- Gen.const(v.sortBy(_._1))) yield sc.parallelize(sorted, n)
Expand Down Expand Up @@ -63,7 +63,13 @@ class OrderedRDDSuite extends SparkSuite {
.foldLeft((true, first)) { case ((b, last), (start, end)) =>
(b && start > last, end)
}._1
sortedWithin && partitionedCorrectly && sortedBetween
val p = sortedWithin && partitionedCorrectly && sortedBetween
if (!p) {
println(s"sortedWithin: $sortedWithin")
println(s"sortedBetween: $sortedBetween")
println(s"partitionedCorrectly: $partitionedCorrectly")
}
p
case None => true
}

Expand Down Expand Up @@ -150,7 +156,7 @@ class OrderedRDDSuite extends SparkSuite {

property("scrambledInPartition") = Prop.forAll(scrambledInPartition) { rdd =>
val (status, ordered) = OrderedRDD.coerce(rdd)
check(ordered, rdd) && status <= OrderedRDD.ARRAY_SORT
check(ordered, rdd) && status <= OrderedRDD.SHUFFLE
}

property("locusSorted") = Prop.forAll(locusSorted) { rdd =>
Expand Down

0 comments on commit c2801ec

Please sign in to comment.