Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the test, finally #4

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions src/main/scala/is/hail/sparkextras/OrderedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ object OrderedRDD {
final val ORDERED_PARTITIONER: CoercionMethod = 0
final val AS_IS: CoercionMethod = 1
final val LOCAL_SORT: CoercionMethod = 2
final val ARRAY_SORT: CoercionMethod = 3
final val SHUFFLE: CoercionMethod = 4
final val SHUFFLE: CoercionMethod = 3

def empty[PK, K, V](sc: SparkContext)(implicit kOk: OrderedKey[PK, K], vct: ClassTag[V]): OrderedRDD[PK, K, V] =
new OrderedRDD[PK, K, V](sc.emptyRDD[(K, V)], OrderedPartitioner.empty)
Expand Down Expand Up @@ -64,7 +63,7 @@ object OrderedRDD {
return (ORDERED_PARTITIONER, ordd.asInstanceOf[OrderedRDD[PK, K, V]])
case _ =>
}

val keys = fastKeys.getOrElse(rdd.map(_._1))

val keyInfo = keys.mapPartitionsWithIndex { case (i, it) =>
Expand All @@ -87,8 +86,8 @@ object OrderedRDD {
r
}

if (partitionsSorted) {
val sortedness = sortedKeyInfo.map(_.sortedness).min
val sortedness = sortedKeyInfo.map(_.sortedness).min
if (partitionsSorted && sortedness >= PartitionKeyInfo.TSORTED) {
val (adjustedPartitions, rangeBounds, adjSortedness) = rangesAndAdjustments[PK, K, V](sortedKeyInfo, sortedness)
val partitioner = OrderedPartitioner[PK, K](rangeBounds, adjustedPartitions.length)
val reorderedPartitionsRDD = rdd.reorderPartitions(sortedKeyInfo.map(_.partIndex))
Expand All @@ -103,12 +102,6 @@ object OrderedRDD {
(LOCAL_SORT, OrderedRDD(adjustedRDD.mapPartitions { it =>
localKeySort(it)
}, partitioner))

case PartitionKeyInfo.UNSORTED =>
info("Coerced unsorted dataset")
(ARRAY_SORT, OrderedRDD(adjustedRDD.mapPartitions { it =>
it.toArray.sortBy(_._1).iterator
}, partitioner))
}
} else {
info("Ordering unsorted dataset with network shuffle")
Expand Down Expand Up @@ -160,19 +153,17 @@ object OrderedRDD {
}

val adjustments = indicesBuilder.result().zipWithIndex.map { case (partitionIndex, index) =>
assert(sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED)
val f: (Iterator[(K, V)]) => Iterator[(K, V)] =
// In the first partition, drop elements that should go in the last if necessary
if (index == 0)
if (adjustmentsBuffer.nonEmpty && min == sortedKeyInfo(adjustmentsBuffer.last.head.index).max)
if (sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) // PK sorted, so can use dropWhile
_.dropWhile(kv => kOk.project(kv._1) == min)
else
_.filter(kv => kOk.project(kv._1) != min)
_.dropWhile(kv => kOk.project(kv._1) == min)
else
identity
else if (sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) // PK sorted, so can use takeWhile
_.takeWhile(kv => kOk.project(kv._1) == max)
else
_.filter(kv => kOk.project(kv._1) == max)
// In every subsequent partition, only take elements that are the max of the last
_.takeWhile(kv => kOk.project(kv._1) == max)
Adjustment(partitionIndex, f)
}

Expand All @@ -190,12 +181,12 @@ object OrderedRDD {
def apply[PK, K, V](rdd: RDD[(K, V)],
orderedPartitioner: OrderedPartitioner[PK, K])
(implicit kOk: OrderedKey[PK, K], vct: ClassTag[V]): OrderedRDD[PK, K, V] = {
assert(rdd.partitions.length == orderedPartitioner.numPartitions, s"${rdd.partitions.length} != ${orderedPartitioner.numPartitions}")
assert(rdd.partitions.length == orderedPartitioner.numPartitions, s"${ rdd.partitions.length } != ${ orderedPartitioner.numPartitions }")

import kOk._

import Ordering.Implicits._

val rangeBoundsBc = rdd.sparkContext.broadcast(orderedPartitioner.rangeBounds)
new OrderedRDD(
rdd.mapPartitionsWithIndex { case (i, it) =>
Expand All @@ -217,7 +208,7 @@ object OrderedRDD {
if (first)
first = false
else
assert(prevK <= r._1, s"$prevK is greater than ${r._1}")
assert(prevK <= r._1, s"$prevK is greater than ${ r._1 }")

prevK = r._1
r
Expand Down
12 changes: 9 additions & 3 deletions src/test/scala/is/hail/utils/OrderedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class OrderedRDDSuite extends SparkSuite {
val locusSorted = for ((n, v) <- g;
locusSorted <- Gen.const(v.sortBy(_._1.locus))) yield sc.parallelize(locusSorted, n)

val scrambledInPartition = locusSorted.map(_.mapPartitions { it => Gen.shuffle(it.toIndexedSeq).sample().iterator })
val scrambledInPartition = locusSorted.map(_.mapPartitions { it => it.toArray.reverseIterator })

val sorted = for ((n, v) <- g;
sorted <- Gen.const(v.sortBy(_._1))) yield sc.parallelize(sorted, n)
Expand Down Expand Up @@ -63,7 +63,13 @@ class OrderedRDDSuite extends SparkSuite {
.foldLeft((true, first)) { case ((b, last), (start, end)) =>
(b && start > last, end)
}._1
sortedWithin && partitionedCorrectly && sortedBetween
val p = sortedWithin && partitionedCorrectly && sortedBetween
if (!p) {
println(s"sortedWithin: $sortedWithin")
println(s"sortedBetween: $sortedBetween")
println(s"partitionedCorrectly: $partitionedCorrectly")
}
p
case None => true
}

Expand Down Expand Up @@ -150,7 +156,7 @@ class OrderedRDDSuite extends SparkSuite {

property("scrambledInPartition") = Prop.forAll(scrambledInPartition) { rdd =>
val (status, ordered) = OrderedRDD.coerce(rdd)
check(ordered, rdd) && status <= OrderedRDD.ARRAY_SORT
check(ordered, rdd) && status <= OrderedRDD.SHUFFLE
}

property("locusSorted") = Prop.forAll(locusSorted) { rdd =>
Expand Down