Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate some but not all uses of RVD.rdd #3186

Merged
merged 26 commits into from
Mar 23, 2018

Conversation

danking
Copy link
Contributor

@danking danking commented Mar 19, 2018

This change anticipates the ContextRDD change wherein RVD.rdd will not
be an RDD. Moreover, enforcing an abstraction barrier at the level of
RVD will ease changes to the implementation of RVD.

There are two remaining types of calls that I cannot eliminate:

  • uses in BlockMatrix and OrderedRDD2: these two classes are building
    new RDDs based on the RVD's rdd, these classes should be considered
    within the implementation of the RVD abstraction. Because these two
    classes are outside of is.hail.rvd, I cannot enforce an access
    modifier on RVD.rdd.

  • uses by methods:

    • LDPrune: it seems we need a "GeneralRVD"

    • Skat: it seems like some of this could be moved to python actually;
      but there is some matrix math that cannot be moved until the expr
      lang has efficient small-matrix ops

    • MatrixTable.same: I could probably move this if I re-implemented
      forall in terms of RVD.aggregate?

    • MatrixTable.annotateRowsIntervalTable: really not sure about this
      one, this seems like a performance optimization that purposely
      reaches through the abstraction to do Smart Things

cseed
cseed previously requested changes Mar 20, 2018
hn
}
// FIXME: DK added this assert when refactoring, but should this just
// always be true? Is this assert necessary?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't an assert there to verify something you think should always be true is actually always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the wording of my FIXME is confusing. I was more confused to see typ.orvdType at all, it seems like it should always be rowsRVD.typ. Should this assert actually be checked immediately after reading? I guess the question is: when is this invariant true? I think maybe it should also be asserted in the constructor of MatrixValue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, always true. I would support an assert there, too. (Although I think MatrixValue shouldn't have a typ, but that's a minor thing that I'm not pushing now.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree as well, but also below my needs-change-noise threshold at this point.

@@ -38,6 +38,9 @@ class OrderedRVD private(
OrderedRVD(newType, partitioner, rdd)
}

def unsafeChangeType(newTyp: OrderedRVDType): OrderedRVD =
OrderedRVD(newTyp, partitioner, rdd)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's unsafe about this?

You should assert the typ.rowType is equal to newTyp.rowType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's no less unsafe than mapPreservesPartitioning in that you could provide a type incompatible with the contents of the resulting OrderedRVD. What do you think of the name updateType?

I added this to enable renameFields in which, I thought, the whole point was to change the rowType (albeit only the names of the fields).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateType is a fine name. I'm not objecting to the operation. Ah, rename fields. Hmm. We don't have something that asserts that the types have the same physical rep. If TStruct was a complex type with representation TTuple, we could just compare the fundamental types. I guess that's a mark against my proposal to have TBaseStruct. Anyway, don't sweat the assert.

OrderedRVD(
typ,
partitioner,
this.rdd.zipPartitions(that, preservesPartitioning)(zipper))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we rely on the RDD partitioner in the RVD code (notice RVD.mapPreservesPartitioning just calls map). Although we copied some Spark language, this partitioner here refers to the ordered-ness and partitioning of the OrderedRVD. So we should have:

RVD.zip(newTyp: Type)(that: RVD): RVD
RVD.zipPreservesPartitioning(newTyp: Type)(that: RVD): RVD

with a default implementation overridden by:

OrderedRVD.zipPreservesPartitioning(newTyp: Type)(that: RVD): OrderedRVD

with the RDD[T] variants as needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the type should be given by this.rowTyp ++ that.rowTyp, not provided, right? If there exist conflicting fields the user can use updateType to resolve that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your zipPartitions functions take a f: (Iterator[RV], Iterator[RV]) => Iterator[RV]. The type won't be the concat, but will depend on f. If you want to concat instead of taking f, then this.rowTyp ++ that.rowTyp seems reasonable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how not taking a zipper: (Iterator[RV], Iterator[RV]) => Iterator[RV] is an option. Then you have to choose how to combine iterators, e.g. some kind of join, and then it's no longer a zipPartitions method. But then, as Cotton says, they need to take a type for the region values produced by zipper.

I used a similar pattern in #3159 to make OrderedRVD.join return an OrderedRVD. And when this goes in, I should refactor the join methods to use your OrderedRVD.zipPartitions.

def empty(sc: SparkContext, typ: OrderedRVDType): OrderedRVD = {
OrderedRVD(typ,
OrderedRVDPartitioner.empty(typ),
sc.emptyRDD[RegionValue])
}

// FIXME: delete this, it's just a wrapper around coerce
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe rename coerce => apply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a point of contention, but I found it confusing that both "trust me this is ordered give me an OrderedRVD" and "please do whatever necessary to order this dataset" were both named apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely confused me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I retract my comment.

OrderedRVD(typ, partitioner, rdd)
}

def toUnsafeRows: RDD[Row] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just say toRows if the interface is a Row. The unsafeness is a implementation detail.

OrderedRVD.shuffle(typ, partitioner, rdd)
}

def assertOrdered(typ: OrderedRVDType, partitioner: OrderedRVDPartitioner): OrderedRVD = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way to write this is a constructor OrderedRVD(RVD, ...).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of prefer this because it's more scalable in RVD subclasses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by it's more scalable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it seems bad to have methods in the base class for each derived class.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #3159 I added RVD.constrainToOrderedPartitioner, which has some overlap with this. It is implemented in UnpartitionedRVD by shuffle, and in OrderedRVD by a narrow-dependency repartition I wrote. It's slightly lower level than this, because it is meant to be used in a case where you want an OrderedRVD with exactly the given partitioner, dropping any data that falls outside the new partitions.

@cseed
Copy link
Collaborator

cseed commented Mar 20, 2018

  1. Move the classes into is.hail.rvd. They aren't used anywhere else, are they?

  2. @maccum is working over LD prune, the GeneralRDD stuff shouldn't be necessary anymore. I think she's just working on optimization, so I feel like her current code is an improvement and should go in before the performance improvements are ready.

  3. I agree, but, yeah, SKAT can't go until we get expr ndarray.

  4. MatrixTable.same just uses zip, you can rewrite it in terms of zip based on my above comments. You might also need a version of zip that returns a RDD[T]. Compare RVD.map and RVD.mapPartitions.

  5. I think the right thing here is an OrderedRVD.orderedIntervalJoin. Compare @patrick-schultz recent OrderedRVD.orderedJoin (pending): https://github.com/hail-is/hail/pull/3159/files#diff-b173fb9bd584d50afcfa6724954ef3b5R127

@cseed
Copy link
Collaborator

cseed commented Mar 20, 2018

Don't feel you need to do all of these now, just whatever is necessary to minimize the complexity of the ContextRDD and offset-to-pointer changes.

@danking danking dismissed cseed’s stale review March 21, 2018 20:11

@cseed, I addressed the concerns you raised in-line, but haven't done any of the five points. I think I'll punt on those for now. If they block my contextrdd changes I'll address them then.

@danking
Copy link
Contributor Author

danking commented Mar 21, 2018

Actually I forgot to clean up zipPartitions.

@danking
Copy link
Contributor Author

danking commented Mar 21, 2018

@cseed Ok, I think this should go in if you like the way I addressed your comments. I'll deal with 1-5 as I come across them. There's no current need for RVD.zip so I'll leave that off until I see I need.

@danking
Copy link
Contributor Author

danking commented Mar 22, 2018

@cseed ping, stacked PRs have proven unsuccessful in the past, so I'm keeping the rest of my work gated, but I'd like to start moving things into master. Can you take a look at the latest changes and confirm if you're cool punting on points 1-5 until later?

cseed
cseed previously requested changes Mar 22, 2018
Copy link
Collaborator

@cseed cseed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern in FilterAlleles otherwise some minor stylistic stuff.

filter(
vsm.rvd,
removeLeftAligned = true,
removeMoving = false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused how these can be written symmetrically. One needs a shuffle and the other doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely a mistake, I didn't preserve the shuffle. Will fix.

What's the difference between coerce and shuffle? Is shuffle just a fast-path (skip checking the partition keys) to the slowest (but most complete) type of coercion?

OrderedRVD(newTyp,
def mapPartitionsPreservesPartitioning(rowTyp: TStruct)
(f: (Iterator[RegionValue]) => Iterator[RegionValue])
: OrderedRVD =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a thing we're doing now, putting the colon on a line by itself? I thought we liked "Patrick's" style:

  def (
    a1: T1,
    ...
    an: Tn
  ): RT

I'm not saying no, I'm just asking if we're changing/endorsing a new style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a formatting flub

def zipPartitionsPreservesPartitioning[T: ClassTag](
newTyp: OrderedRVDType, that: RDD[T]
)(zipper: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue]
) : OrderedRVD =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a space before the colon here but not elsewhere.

@danking danking dismissed cseed’s stale review March 22, 2018 19:57

@cseed all addressed. Raised a question for my understanding though: "What's the difference between coerce and shuffle? Is shuffle just a fast-path (skip checking the partition keys) to the slowest (but most complete) type of coercion?"

Daniel King added 17 commits March 22, 2018 18:23
This change anticipates the ContextRDD change wherein `RVD.rdd` will not
be an RDD. Moreover, enforcing an abstraction barrier at the level of
`RVD` will ease changes to the implementation of `RVD`.

There are two remaining types of calls that I cannot eliminate:

 - uses in BlockMatrix and OrderedRDD2: these two classes are building
   new RDDs based on the RVD's rdd, these classes should be considered
   within the implementation of the RVD abstraction. Because these two
   classes are outside of `is.hail.rvd`, I cannot enforce an access
   modifier on `RVD.rdd`.

 - uses by methods:

   - LDPrune: it seems we need a "GeneralRVD"

   - Skat: it seems like some of this could be moved to python actually;
     but there is some matrix math that cannot be moved until the expr
     lang has efficient small-matrix ops

   - MatrixTable.same: I could probably move this if I re-implemented
     forall in terms of RVD.aggregate?

   - MatrixTable.annotateRowsIntervalTable: really not sure about this
     one, this seems like a performance optimization that purposely
     reaches through the abstraction to do Smart Things
cseed
cseed previously requested changes Mar 22, 2018
Copy link
Collaborator

@cseed cseed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more instances of formatting.

type CoercionMethod = Int
def zipPartitionsPreservesPartitioning[T: ClassTag](
newTyp: OrderedRVDType, that: RDD[T]
)(zipper: (Iterator[RegionValue], Iterator[T]) => Iterator[RegionValue]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should zipper be on its own line?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No opinion just asking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the full signature is >80 characters, so I went for the @patrick-schultz style function definition (with each arg on it's own line), but had to improvise a bit since there are two argument lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Yes, I think that should also have its own line.

final val SHUFFLE: CoercionMethod = 3
def zipPartitionsPreservesPartitioning(newTyp: OrderedRVDType, that: RVD)
(zipper: (Iterator[RegionValue], Iterator[RegionValue]) => Iterator[RegionValue])
: OrderedRVD =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another dangling colon.

this.rdd.zipPartitions(that.rdd, preservesPartitioning = true)(zipper))

def writeRowsSplit(path: String, t: MatrixType, codecSpec: CodecSpec)
: Array[Long] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another.

partitionKey: Array[String] = partitionKey,
key: Array[String] = key,
rowType: TStruct = rowType
) : OrderedRVDType = new OrderedRVDType(partitionKey, key, rowType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before colon.

Copy link
Collaborator

@cseed cseed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor code improvement you can leave or take.

@@ -23,6 +23,24 @@ class OrderedRVD(
self =>
def rowType: TStruct = typ.rowType

// should be totally generic, permitting any number of keys, but that requires more work
def downcastToPK(): OrderedRVD = {
val newType = new OrderedRVDType(partitionKey = typ.partitionKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use OrderedRVDType.copy here.

}

def upcast(castKeys: Array[String]): OrderedRVD = {
val newType = new OrderedRVDType(partitionKey = typ.partitionKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.


def rowsTable(): Table = {
val tableType = TableType(rowType, rowKey, globalType)
new Table(hc, TableLiteral(TableValue(tableType, globals, rowFieldsRVD)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to ask for a MatrixRowsTable IR node, but I'll hold off for now.

@patrick-schultz
Copy link
Collaborator

Why did you need to add back downcastToPK and upcast? I tried to make those unnecessary with KeyedOrderedRVD.

@danking
Copy link
Contributor Author

danking commented Mar 23, 2018

@patrick-schultz rebase failure, I'll remove.

@danking
Copy link
Contributor Author

danking commented Mar 23, 2018

Removed those, which also addressed cotton's comments. When tests pass I'll merge and then push the next PR up!

@cseed cseed merged commit 96bf3bc into hail-is:master Mar 23, 2018
@danking danking deleted the limit-uses-of-rvd-rdd branch March 24, 2018 00:30
konradjk pushed a commit to konradjk/hail that referenced this pull request Jun 12, 2018
* Eliminate some but not all uses of RVD.rdd

This change anticipates the ContextRDD change wherein `RVD.rdd` will not
be an RDD. Moreover, enforcing an abstraction barrier at the level of
`RVD` will ease changes to the implementation of `RVD`.

There are two remaining types of calls that I cannot eliminate:

 - uses in BlockMatrix and OrderedRDD2: these two classes are building
   new RDDs based on the RVD's rdd, these classes should be considered
   within the implementation of the RVD abstraction. Because these two
   classes are outside of `is.hail.rvd`, I cannot enforce an access
   modifier on `RVD.rdd`.

 - uses by methods:

   - LDPrune: it seems we need a "GeneralRVD"

   - Skat: it seems like some of this could be moved to python actually;
     but there is some matrix math that cannot be moved until the expr
     lang has efficient small-matrix ops

   - MatrixTable.same: I could probably move this if I re-implemented
     forall in terms of RVD.aggregate?

   - MatrixTable.annotateRowsIntervalTable: really not sure about this
     one, this seems like a performance optimization that purposely
     reaches through the abstraction to do Smart Things

* clean up

* formatting

* more formatting

* use assertOrdered instead of old apply

* fixes

* improve use of assertions

* rename toUnsafeRows to toRows

* rename unsafeChangeType to updateType

* wip zip not sure what to do

* finish renames

* fix invalid assertions

* remove coerceOrdered, remove OrderedRVD.apply

* fixes and eliminate coerceOrdered

* actually remove coerceOrdered

* fix

* clean up zipPartitions definitions and uses

* name error

* fix name

* Update OrderedRVD.scala

* Update OrderedRVD.scala

* fix filteralleles shuffle and friends

* formatting

* rebase errors

* harmonize formatting

* remove rebase cruft
jackgoldsmith4 pushed a commit to jackgoldsmith4/hail that referenced this pull request Jun 25, 2018
* Eliminate some but not all uses of RVD.rdd

This change anticipates the ContextRDD change wherein `RVD.rdd` will not
be an RDD. Moreover, enforcing an abstraction barrier at the level of
`RVD` will ease changes to the implementation of `RVD`.

There are two remaining types of calls that I cannot eliminate:

 - uses in BlockMatrix and OrderedRDD2: these two classes are building
   new RDDs based on the RVD's rdd, these classes should be considered
   within the implementation of the RVD abstraction. Because these two
   classes are outside of `is.hail.rvd`, I cannot enforce an access
   modifier on `RVD.rdd`.

 - uses by methods:

   - LDPrune: it seems we need a "GeneralRVD"

   - Skat: it seems like some of this could be moved to python actually;
     but there is some matrix math that cannot be moved until the expr
     lang has efficient small-matrix ops

   - MatrixTable.same: I could probably move this if I re-implemented
     forall in terms of RVD.aggregate?

   - MatrixTable.annotateRowsIntervalTable: really not sure about this
     one, this seems like a performance optimization that purposely
     reaches through the abstraction to do Smart Things

* clean up

* formatting

* more formatting

* use assertOrdered instead of old apply

* fixes

* improve use of assertions

* rename toUnsafeRows to toRows

* rename unsafeChangeType to updateType

* wip zip not sure what to do

* finish renames

* fix invalid assertions

* remove coerceOrdered, remove OrderedRVD.apply

* fixes and eliminate coerceOrdered

* actually remove coerceOrdered

* fix

* clean up zipPartitions definitions and uses

* name error

* fix name

* Update OrderedRVD.scala

* Update OrderedRVD.scala

* fix filteralleles shuffle and friends

* formatting

* rebase errors

* harmonize formatting

* remove rebase cruft
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants