-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Table.join use ordered joins on OrderedRVIterator #3159
Make Table.join use ordered joins on OrderedRVIterator #3159
Conversation
cc @cseed (say that three times fast!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments, mostly stylistic.
Something that makes me uncomfortable is letting the struct ExtendedOrdering compare values of different types (different lengths). It feels like we should be doing this by using physical types to produce the correct key for free, but we can't do this right now. What are your thoughts?
@@ -5,6 +5,15 @@ import is.hail.utils._ | |||
|
|||
case class OrderedRVIterator(t: OrderedRVDType, iterator: Iterator[RegionValue]) { | |||
|
|||
def restrictToPKInterval(interval: Interval): Iterator[RegionValue] = { | |||
val ur = new UnsafeRow(t.rowType, null, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the constructor that just takes a type here
case "left" => _.leftJoin(_) | ||
case "right" => _.rightJoin(_) | ||
case "outer" => _.outerJoin(_) | ||
case _ => fatal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be removed, it's checked in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just make the match (@unchecked)
joinType match { | ||
case "inner" => _.innerJoinDistinct(_) | ||
case "left" => _.leftJoinDistinct(_) | ||
case _ => fatal(s"Unknown join type `$joinType'. Choose from `inner' or `left'.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can also be unchecked, I think.
def restrictToPKInterval(interval: Interval): Iterator[RegionValue] = { | ||
val ur = new UnsafeRow(t.rowType, null, 0) | ||
val pk = new KeyedRow(ur, t.kRowFieldIdx) | ||
iterator.filter( rv => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer curly braces around the entire thing instead of parens and curly braces together
@@ -301,3 +301,14 @@ class UnsafeRow(var t: TBaseStruct, | |||
} | |||
} | |||
} | |||
|
|||
class KeyedRow(var row: Row, keyFields: Array[Int]) extends Row { | |||
def this(row: Row) = this(row, Array.range(0, row.size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this constructor doesn't seem used, can we delete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes. That was from when I didn't have KeyedRow
extending Row
.
* needed. No assumption should need to be made about partition keys, but currently | ||
* assumes old partition key type is a prefix of the new partition key type. | ||
*/ | ||
class RepartitionedOrderedRDD2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RepartitionedOrderedRVD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an RDD[RegionValue]
, not an RVD
.
|
||
case class RepartitionedOrderedRDD2Partition( | ||
index: Int, | ||
parents: Seq[Partition], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be an array? It's probably a List as implemented here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was copying from older code, and wasn't sure why some places used Array
and some Seq
. Array
should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably because Spark uses Seq everywhere :)
case "outer" => rddLeft.fullOuterJoin(rddRight).map { case (k, (l, r)) => merger(k, l.orNull, r.orNull) } | ||
case _ => fatal("Invalid join type specified. Choose one of `left', `right', `inner', `outer'") | ||
val left = this.rvd match { | ||
case ordered: OrderedRVD => ordered |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the case that it's ordered but keyed by the wrong thing? I'm having a bit of trouble reasoning about that case right now. Could it happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, keyBy does a horrible thing right now. so maybe it can't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add an assertion, at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to decide what keys mean in different places. One thing I think would be reasonable is
- Keys on
Table
, both in Python and in Scala, are purely metadata used to determine how to join. - Keys on
OrderedRVD
are a way of recording the ordering invariant satisfied by the underlyingRDD
.
If this is the case, then I need to rewrite this. When I wrote it, I think I was assuming the Table
keys and the OrderedRVD
keys had to be the same. Now I understand Table
s are never ordered currently. We should probably make a reasonable choice of what keys mean and document it.
joinType, | ||
rvMerger, | ||
new OrderedRVDType(left.typ.partitionKey, left.typ.key, newSignature)) | ||
copy2(rvd = joinedRVD, signature = newSignature, key = key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need the key=key I think
} | ||
|
||
// @Test def testGetPartitionPKWithSmallerKeys() { | ||
// assert(partitioner.getPartitionPK(Row(2)) == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meant to be uncommented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is a test that I think should be made to pass, but currently doesn't. This is where I need the general comparison of arbitrary length tuples. I could remove this, or add a comment saying to enable the test when we can.
We should discuss the struct ordering in person. I think there are orderings that can be defined on the space of all tuples (since the names don't matter) of arbitrary lengths, which are very helpful in working with changing keys and partition keys. In principle, it should be easy to repartition an OrderedRVD with a longer partition key to a partitioner with a shorter partition key, but currently that doesn't look simple to do. I tried to lay the groundwork here to make that trivial. |
Speaking of not understanding what keys mean, I found what looks to me like a bug, but I'm not sure. I want to make join keys parameters of |
8fe3b57
to
c3df4a8
Compare
I addressed most of your comments. I also fixed the |
I moved table join to the IR, and it was such a tiny change I just added it to this PR. I can separate it back out if you'd prefer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. A few discussion points to address, but feel free to push back on any or all.
import org.apache.spark.rdd.RDD | ||
import is.hail.utils.fatal | ||
|
||
class KeyedOrderedRVD(val rvd: OrderedRVD, val key: Array[String]) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be serializable?
|
||
require(ordType.rowType == typ.rowType) | ||
require(ordType.kType isPrefixOf typ.kType) | ||
// require(newPartitioner.kType isIsomorphicTo ordType.kType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's frustrating because this should be an invariant satisfied by all OrderedRVD
s. But I didn't see any simple way to make it pass, and I'm planning on a refactoring that removes kType
from OrderedRVDPartitioner
, getting rid of the redundancy. I'll just delete this.
} | ||
} | ||
|
||
OrderedRVD.shuffle(ordType, newPartitioner, filtered) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to consider using the coerce strategy with newPartitioner
as a hint partitioner. Maybe not, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coerce
felt too high level for this, but I'm open to discussion. This method says "give me back an OrderedRVD
with exactly this partitioner, dropping any data that fall outside the given partitions." So the partitioner argument is more than a hint.
The other factor is that coerce
does a pass over the data collecting statistics, and detecting if it was already ordered. My feeling was that this method should be an explicit "just do a shuffle". Probably collecting statistics and making choices based on the results should be explicit in the IR, not built into the methods the IR calls. Maybe @cseed has thoughts?
Works with arbitrary partition keys.
ae2272b
to
72fd435
Compare
* wip * wip * RepartionedOrderedRDD2 works * Improve joins code on OrderedRVD * wip on Table.join * Add RVD.constrainToOrderedPartitioner * Add KeyedRow * Make compile and pass tests * Generalize RepartitionedOrderedRDD2 Works with arbitrary partition keys. * Table.join works using ordered joins * Cleanup * Start writing OrderedRVDPartitionerSuite * Cleanup * Address comments * Make KeyedOrderedRVD * fix * Move Table.join to IR * whoops * Address comments
* wip * wip * RepartionedOrderedRDD2 works * Improve joins code on OrderedRVD * wip on Table.join * Add RVD.constrainToOrderedPartitioner * Add KeyedRow * Make compile and pass tests * Generalize RepartitionedOrderedRDD2 Works with arbitrary partition keys. * Table.join works using ordered joins * Cleanup * Start writing OrderedRVDPartitionerSuite * Cleanup * Address comments * Make KeyedOrderedRVD * fix * Move Table.join to IR * whoops * Address comments
* wip * wip * RepartionedOrderedRDD2 works * Improve joins code on OrderedRVD * wip on Table.join * Add RVD.constrainToOrderedPartitioner * Add KeyedRow * Make compile and pass tests * Generalize RepartitionedOrderedRDD2 Works with arbitrary partition keys. * Table.join works using ordered joins * Cleanup * Start writing OrderedRVDPartitionerSuite * Cleanup * Address comments * Make KeyedOrderedRVD * fix * Move Table.join to IR * whoops * Address comments
This PR is to get Table.join to stop using Spark's join using Annotations, instead using ordered joins directly with region values.
Putting the new join into the IR will be a future PR.
I've given significant thought to how to make joins work making zero assumptions about partition keys. I'm convinced this is the right thing to do, and that with the right foundations it would just work without much extra code. I started to build those foundations in this PR, but some more work is needed to remove all partition key preconditions.