-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[compiler] Reorganize the compiler control flow to prepare for partition planning #12587
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great change!
@@ -1563,7 +1563,6 @@ object IRParser { | |||
table_ir(env.onlyRelational)(it).map { child => | |||
TableKeyBy(child, keys, isSorted) | |||
} | |||
case "TableDistinct" => table_ir(env.onlyRelational)(it).map(TableDistinct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain this deletion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is explained by an overeager finger on the delete key when I backed out the TableStrictify node :D
globals.typ.asInstanceOf[TStruct] | ||
) | ||
|
||
override def pathsUsed: Seq[String] = Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be the files in orderedOutputPartitions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer -- maybe
longer answer -- I don't think this really matters, pathsUsed is used to try to catch when users read/write to the same table file. Anything generated in the compiler that's not user-exposed doesn't actually need to define stuff here. I think the most correct answer for pathsUsed would be the root directory of all the output partitions (rather than each file individually). Should probably rename pathsUsed to userPathsUsed
// change 1 - contexts is now not known until the partitioner is passed down | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like wip notes?
def strictifyGeneric(allowedOverlap: Int): RVDPartitioner = { | ||
if (satisfiesAllowedOverlap(allowedOverlap)) | ||
this | ||
else | ||
coarsen(allowedOverlap+1) | ||
.strictify | ||
.extendKey(kType) | ||
} | ||
def strictify: RVDPartitioner = extendKey(kType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this into a single function
def strictify(allowedOverlap: Int = kType.size - 1): RVDPartitioner = {
if (satisfiesAllowedOverlap(allowedOverlap))
this
else
coarsen(allowedOverlap+1).extendKey(kType)
}
Also note, re. my colocalizedKey
comment, that this does ensure that strictify(kType.size)
is always a no-op.
@@ -1475,18 +1404,11 @@ object LowerTableIR { | |||
s"isSorted=${isSorted}, nPresFields=${nPreservedFields}, newKey=${newKey}, " + | |||
s"originalKey = ${loweredChild.kType.fieldNames.toSeq}, child key=${child.typ.keyType}") | |||
|
|||
if (nPreservedFields == newKey.length || isSorted) | |||
require(nPreservedFields == newKey.length || isSorted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably just use definitelyDoesNotShuffle
@@ -3213,6 +3223,8 @@ object TableOrderBy { | |||
} | |||
|
|||
case class TableOrderBy(child: TableIR, sortFields: IndexedSeq[SortField]) extends TableIR { | |||
|
|||
lazy val definitelyDoesNotShuffle: Boolean = sortFields.forall(_.sortOrder == Ascending) && child.typ.key.startsWith(sortFields.map(_.field)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use TableOrderBy.isAlreadyOrdered
to avoid duplicating logic.
ctx.backend.lowerDistributedSort( | ||
ctx, loweredChild, sortFields, relationalLetsAbove, rowRType) | ||
} | ||
require(TableOrderBy.isAlreadyOrdered(sortFields, loweredChild.partitioner.kType.fieldNames)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise should just use definitelyDoesNotShuffle
case TableMapPartitions(child, globalName, partitionStreamName, body, colocalizedKey) => | ||
val loweredChild = { | ||
val lc = lower(child) | ||
colocalizedKey match { | ||
case Some(k) => lc.strictify(k) | ||
case None => lc | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If colocalizedKey
is used as the allowedOverlap
, then I think the name is misleading, and should be something like allowedKeyOverlap
. Also, it doesn't need to be optional, the None
case is the same as allowedKeyOverlap = keyType.length
.
d2a6e4f
to
3bfb13f
Compare
Made almost all your suggested changes. I left the allowedOverlap on TMP as an option, because I think allowedOverlap == ktype.size actually does indicate that the consumer requires a keyed (sorted) input, and the None case doesn't. |
It doesn't. I think we rarely if ever need to treat unkeyed tables as a special case. Unkeyed tables are precisely the case where |
Here's a case I'm worried about -- I have The TableAggregate doesn't care about keyed input (commutative aggregator or something) My TMP partition function requires its input to be sorted by locus, but allows overlap between partitions. How do I use a single non-optional allowedOverlap to express this versus a TMP function that doesn't care about keying/sorting at all? |
Ah, I see. I think the answer is: you can't. You would need another integer parameter to say "I actually depend on this prefix of the key being sorted". It seems like allowedOverlap and requiredSortedPrefix are completely independent. In the single key case (for simplicity), you may or may not care if keys are localized in one partition, and you may or may not care if they're sorted. I don't see any connection. |
d13e753
to
07b2564
Compare
d394535
to
61922ba
Compare
c90e9de
to
6f06ac2
Compare
hail/python/hail/table.py
Outdated
@@ -3834,7 +3834,7 @@ def _map_partitions(self, f): | |||
raise ValueError('Table._map_partitions must preserve key fields') | |||
|
|||
body_ir = ir.Let('global', ir.Ref(globals_uid, self._global_type), body._ir) | |||
return Table(ir.TableMapPartitions(self._tir, globals_uid, rows_uid, body_ir)) | |||
return Table(ir.TableMapPartitions(self._tir, globals_uid, rows_uid, body_ir, 0, len(self.key))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember the semantics of requested_key
right, doesn't this need to assume the body might depend on the entire key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. I think that's the safe assumption. We can add these as params to the method in the future.
import is.hail.expr.ir.BaseIR | ||
import is.hail.expr.ir.{BaseIR, LoweringAnalyses, TableIR} | ||
import is.hail.expr.ir.functions.IRFunctionRegistry | ||
import is.hail.expr.ir.lowering.{CanLowerEfficiently, DArrayLowering, LowerTableIR, TableStage} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume these are unused, since there are no other changes?
} | ||
} | ||
bindIR(invoke("extend", TArray(TInt32), ToArray(mapIR(rangeIR(nPartitionsAdj)) { partIdx => | ||
invoke("ceil", TFloat64, partIdx.toD * numRowsRef.toD / nPartitionsAdj.toDouble).toI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a reason this can't be (partIdx * numRowsRef) floorDiv nPartitionsAdj
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that works, didn't go the final step in simplifying after rewriting. 👍
@@ -69,7 +69,7 @@ class RVDPartitioner( | |||
Some(Interval(rangeBounds.head.left, rangeBounds.last.right)) | |||
|
|||
def satisfiesAllowedOverlap(testAllowedOverlap: Int): Boolean = | |||
RVDPartitioner.isValid(sm, kType, rangeBounds, testAllowedOverlap) | |||
(testAllowedOverlap >= kType.size) || RVDPartitioner.isValid(sm, kType, rangeBounds, testAllowedOverlap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move the guard into RVDPartitioner.isValid
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, good change.
d9f1d99
to
a880a90
Compare
…ion planning Evaluation of relational lets is an explicit pass. Executing and rewriting shuffles is an explicit pass. * lowerDistributedSort executes the shuffle and produces a TableReader Higher-level passes that recursively lower and execute are parameterized by the contained pipeline. fix EvalRelationalLets fix some tests update TMP and some CanLowerEfficiently fixes fix fixes colocalized_key => allowed_overlap address comments TMP change again
a880a90
to
7118893
Compare
object RTuple { | ||
def apply(fields: Seq[TypeWithRequiredness]): RTuple = | ||
RTuple(Array.tabulate(fields.length)(i => RField(i.toString, fields(i), i))) | ||
def apply(fields: Seq[(Int, TypeWithRequiredness)]): RTuple = | ||
RTuple(fields.zipWithIndex.map { case ((fdIdx, typ), i) => RField(fdIdx.toString, typ, i) }.toIndexedSeq) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Evaluation of relational lets is an explicit pass.
Executing and rewriting shuffles is an explicit pass.
Higher-level passes that recursively lower and execute are parameterized
by the contained pipeline.