Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Table.join use ordered joins on OrderedRVIterator #3159

Merged
merged 19 commits into from
Mar 21, 2018

Conversation

patrick-schultz
Copy link
Collaborator

This PR is to get Table.join to stop using Spark's join using Annotations, instead using ordered joins directly with region values.

Putting the new join into the IR will be a future PR.

I've given significant thought to how to make joins work making zero assumptions about partition keys. I'm convinced this is the right thing to do, and that with the right foundations it would just work without much extra code. I started to build those foundations in this PR, but some more work is needed to remove all partition key preconditions.

@patrick-schultz
Copy link
Collaborator Author

cc @cseed (say that three times fast!)

Copy link
Contributor

@tpoterba tpoterba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments, mostly stylistic.

Something that makes me uncomfortable is letting the struct ExtendedOrdering compare values of different types (different lengths). It feels like we should be doing this by using physical types to produce the correct key for free, but we can't do this right now. What are your thoughts?

@@ -5,6 +5,15 @@ import is.hail.utils._

case class OrderedRVIterator(t: OrderedRVDType, iterator: Iterator[RegionValue]) {

def restrictToPKInterval(interval: Interval): Iterator[RegionValue] = {
val ur = new UnsafeRow(t.rowType, null, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the constructor that just takes a type here

case "left" => _.leftJoin(_)
case "right" => _.rightJoin(_)
case "outer" => _.outerJoin(_)
case _ => fatal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be removed, it's checked in Python.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just make the match (@unchecked)

joinType match {
case "inner" => _.innerJoinDistinct(_)
case "left" => _.leftJoinDistinct(_)
case _ => fatal(s"Unknown join type `$joinType'. Choose from `inner' or `left'.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also be unchecked, I think.

def restrictToPKInterval(interval: Interval): Iterator[RegionValue] = {
val ur = new UnsafeRow(t.rowType, null, 0)
val pk = new KeyedRow(ur, t.kRowFieldIdx)
iterator.filter( rv => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer curly braces around the entire thing instead of parens and curly braces together

@@ -301,3 +301,14 @@ class UnsafeRow(var t: TBaseStruct,
}
}
}

class KeyedRow(var row: Row, keyFields: Array[Int]) extends Row {
def this(row: Row) = this(row, Array.range(0, row.size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor doesn't seem used, can we delete it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes. That was from when I didn't have KeyedRow extending Row.

* needed. No assumption should need to be made about partition keys, but currently
* assumes old partition key type is a prefix of the new partition key type.
*/
class RepartitionedOrderedRDD2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RepartitionedOrderedRVD?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an RDD[RegionValue], not an RVD.


case class RepartitionedOrderedRDD2Partition(
index: Int,
parents: Seq[Partition],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be an array? It's probably a List as implemented here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was copying from older code, and wasn't sure why some places used Array and some Seq. Array should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably because Spark uses Seq everywhere :)

case "outer" => rddLeft.fullOuterJoin(rddRight).map { case (k, (l, r)) => merger(k, l.orNull, r.orNull) }
case _ => fatal("Invalid join type specified. Choose one of `left', `right', `inner', `outer'")
val left = this.rvd match {
case ordered: OrderedRVD => ordered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the case that it's ordered but keyed by the wrong thing? I'm having a bit of trouble reasoning about that case right now. Could it happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, keyBy does a horrible thing right now. so maybe it can't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add an assertion, at least.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to decide what keys mean in different places. One thing I think would be reasonable is

  • Keys on Table, both in Python and in Scala, are purely metadata used to determine how to join.
  • Keys on OrderedRVD are a way of recording the ordering invariant satisfied by the underlying RDD.

If this is the case, then I need to rewrite this. When I wrote it, I think I was assuming the Table keys and the OrderedRVD keys had to be the same. Now I understand Tables are never ordered currently. We should probably make a reasonable choice of what keys mean and document it.

joinType,
rvMerger,
new OrderedRVDType(left.typ.partitionKey, left.typ.key, newSignature))
copy2(rvd = joinedRVD, signature = newSignature, key = key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need the key=key I think

}

// @Test def testGetPartitionPKWithSmallerKeys() {
// assert(partitioner.getPartitionPK(Row(2)) == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meant to be uncommented?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is a test that I think should be made to pass, but currently doesn't. This is where I need the general comparison of arbitrary length tuples. I could remove this, or add a comment saying to enable the test when we can.

@patrick-schultz
Copy link
Collaborator Author

We should discuss the struct ordering in person. I think there are orderings that can be defined on the space of all tuples (since the names don't matter) of arbitrary lengths, which are very helpful in working with changing keys and partition keys. In principle, it should be easy to repartition an OrderedRVD with a longer partition key to a partitioner with a shorter partition key, but currently that doesn't look simple to do. I tried to lay the groundwork here to make that trivial.

@patrick-schultz
Copy link
Collaborator Author

Speaking of not understanding what keys mean, I found what looks to me like a bug, but I'm not sure. OrderedRVD.downcastToPK creates an OrderedRVD for which typ.kType is different from partitioner.kType. It's triggering the assert I made in RepartitionedOrderedRDD2 that says the new key must be a prefix of the old, to ensure that no sorting needs to be done.

I want to make join keys parameters of OrderedRVD.join, allowing them to be different from the partitioner keys. I was putting that off for a later PR, but now I think I might need to do that to fix this.

@patrick-schultz
Copy link
Collaborator Author

I addressed most of your comments. I also fixed the downcastToPK problem by getting rid of it, instead adding a KeyedOrderedRVD which has a join key in addition to an ordering key.

@patrick-schultz
Copy link
Collaborator Author

I moved table join to the IR, and it was such a tiny change I just added it to this PR. I can separate it back out if you'd prefer.

Copy link
Contributor

@tpoterba tpoterba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. A few discussion points to address, but feel free to push back on any or all.

import org.apache.spark.rdd.RDD
import is.hail.utils.fatal

class KeyedOrderedRVD(val rvd: OrderedRVD, val key: Array[String]) extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be serializable?


require(ordType.rowType == typ.rowType)
require(ordType.kType isPrefixOf typ.kType)
// require(newPartitioner.kType isIsomorphicTo ordType.kType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's frustrating because this should be an invariant satisfied by all OrderedRVDs. But I didn't see any simple way to make it pass, and I'm planning on a refactoring that removes kType from OrderedRVDPartitioner, getting rid of the redundancy. I'll just delete this.

}
}

OrderedRVD.shuffle(ordType, newPartitioner, filtered)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might want to consider using the coerce strategy with newPartitioner as a hint partitioner. Maybe not, though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coerce felt too high level for this, but I'm open to discussion. This method says "give me back an OrderedRVD with exactly this partitioner, dropping any data that fall outside the given partitions." So the partitioner argument is more than a hint.

The other factor is that coerce does a pass over the data collecting statistics, and detecting if it was already ordered. My feeling was that this method should be an explicit "just do a shuffle". Probably collecting statistics and making choices based on the results should be explicit in the IR, not built into the methods the IR calls. Maybe @cseed has thoughts?

@tpoterba tpoterba merged commit c1cc9ed into hail-is:master Mar 21, 2018
jbloom22 pushed a commit to jbloom22/hail that referenced this pull request Mar 22, 2018
* wip

* wip

* RepartionedOrderedRDD2 works

* Improve joins code on OrderedRVD

* wip on Table.join

* Add RVD.constrainToOrderedPartitioner

* Add KeyedRow

* Make compile and pass tests

* Generalize RepartitionedOrderedRDD2

Works with arbitrary partition keys.

* Table.join works using ordered joins

* Cleanup

* Start writing OrderedRVDPartitionerSuite

* Cleanup

* Address comments

* Make KeyedOrderedRVD

* fix

* Move Table.join to IR

* whoops

* Address comments
@patrick-schultz patrick-schultz deleted the ordered_repartition branch June 4, 2018 14:07
konradjk pushed a commit to konradjk/hail that referenced this pull request Jun 12, 2018
* wip

* wip

* RepartionedOrderedRDD2 works

* Improve joins code on OrderedRVD

* wip on Table.join

* Add RVD.constrainToOrderedPartitioner

* Add KeyedRow

* Make compile and pass tests

* Generalize RepartitionedOrderedRDD2

Works with arbitrary partition keys.

* Table.join works using ordered joins

* Cleanup

* Start writing OrderedRVDPartitionerSuite

* Cleanup

* Address comments

* Make KeyedOrderedRVD

* fix

* Move Table.join to IR

* whoops

* Address comments
jackgoldsmith4 pushed a commit to jackgoldsmith4/hail that referenced this pull request Jun 25, 2018
* wip

* wip

* RepartionedOrderedRDD2 works

* Improve joins code on OrderedRVD

* wip on Table.join

* Add RVD.constrainToOrderedPartitioner

* Add KeyedRow

* Make compile and pass tests

* Generalize RepartitionedOrderedRDD2

Works with arbitrary partition keys.

* Table.join works using ordered joins

* Cleanup

* Start writing OrderedRVDPartitionerSuite

* Cleanup

* Address comments

* Make KeyedOrderedRVD

* fix

* Move Table.join to IR

* whoops

* Address comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants