Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compiler] Reorganize the compiler control flow to prepare for partition planning #12587

Merged
merged 35 commits into from
Mar 15, 2023

Conversation

tpoterba
Copy link
Contributor

Evaluation of relational lets is an explicit pass.

Executing and rewriting shuffles is an explicit pass.

  • lowerDistributedSort executes the shuffle and produces a TableReader

Higher-level passes that recursively lower and execute are parameterized
by the contained pipeline.

Copy link
Collaborator

@patrick-schultz patrick-schultz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change!

@@ -1563,7 +1563,6 @@ object IRParser {
table_ir(env.onlyRelational)(it).map { child =>
TableKeyBy(child, keys, isSorted)
}
case "TableDistinct" => table_ir(env.onlyRelational)(it).map(TableDistinct)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this deletion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is explained by an overeager finger on the delete key when I backed out the TableStrictify node :D

globals.typ.asInstanceOf[TStruct]
)

override def pathsUsed: Seq[String] = Seq()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the files in orderedOutputPartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer -- maybe

longer answer -- I don't think this really matters, pathsUsed is used to try to catch when users read/write to the same table file. Anything generated in the compiler that's not user-exposed doesn't actually need to define stuff here. I think the most correct answer for pathsUsed would be the root directory of all the output partitions (rather than each file individually). Should probably rename pathsUsed to userPathsUsed

Comment on lines 76 to 77
// change 1 - contexts is now not known until the partitioner is passed down
//
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like wip notes?

Comment on lines 126 to 134
def strictifyGeneric(allowedOverlap: Int): RVDPartitioner = {
if (satisfiesAllowedOverlap(allowedOverlap))
this
else
coarsen(allowedOverlap+1)
.strictify
.extendKey(kType)
}
def strictify: RVDPartitioner = extendKey(kType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this into a single function

  def strictify(allowedOverlap: Int = kType.size - 1): RVDPartitioner = {
     if (satisfiesAllowedOverlap(allowedOverlap))
       this
     else
       coarsen(allowedOverlap+1).extendKey(kType)
   }

Also note, re. my colocalizedKey comment, that this does ensure that strictify(kType.size) is always a no-op.

@@ -1475,18 +1404,11 @@ object LowerTableIR {
s"isSorted=${isSorted}, nPresFields=${nPreservedFields}, newKey=${newKey}, " +
s"originalKey = ${loweredChild.kType.fieldNames.toSeq}, child key=${child.typ.keyType}")

if (nPreservedFields == newKey.length || isSorted)
require(nPreservedFields == newKey.length || isSorted)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably just use definitelyDoesNotShuffle

@@ -3213,6 +3223,8 @@ object TableOrderBy {
}

case class TableOrderBy(child: TableIR, sortFields: IndexedSeq[SortField]) extends TableIR {

lazy val definitelyDoesNotShuffle: Boolean = sortFields.forall(_.sortOrder == Ascending) && child.typ.key.startsWith(sortFields.map(_.field))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use TableOrderBy.isAlreadyOrdered to avoid duplicating logic.

ctx.backend.lowerDistributedSort(
ctx, loweredChild, sortFields, relationalLetsAbove, rowRType)
}
require(TableOrderBy.isAlreadyOrdered(sortFields, loweredChild.partitioner.kType.fieldNames))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise should just use definitelyDoesNotShuffle

Comment on lines 1614 to 1633
case TableMapPartitions(child, globalName, partitionStreamName, body, colocalizedKey) =>
val loweredChild = {
val lc = lower(child)
colocalizedKey match {
case Some(k) => lc.strictify(k)
case None => lc
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If colocalizedKey is used as the allowedOverlap, then I think the name is misleading, and should be something like allowedKeyOverlap. Also, it doesn't need to be optional, the None case is the same as allowedKeyOverlap = keyType.length.

@tpoterba
Copy link
Contributor Author

Made almost all your suggested changes. I left the allowedOverlap on TMP as an option, because I think allowedOverlap == ktype.size actually does indicate that the consumer requires a keyed (sorted) input, and the None case doesn't.

@patrick-schultz
Copy link
Collaborator

I think allowedOverlap == ktype.size actually does indicate that the consumer requires a keyed (sorted) input

It doesn't. I think we rarely if ever need to treat unkeyed tables as a special case.

Unkeyed tables are precisely the case where ktype.size == allowedOverlap == 0. E.g. see RVDPartitioner.unkeyed. So for TableMapPartitions on unkeyed tables, allowedOverlap = 0 means normal map partitions, and allowedOverlap = -1 means it needs to process all rows in one partition, which is consistent with the general case where allowedOverlap = ktype.size means normal map partitions and allowedOverlap = ktype.size - 1 means group all equal keys together. (We don't actually allow allowedOverlap = -1, but if we did it would always mean use a single partition.)

@tpoterba
Copy link
Contributor Author

Here's a case I'm worried about --

I have TableAggregate(TableMapPartitions(child)). the key of child is locus.

The TableAggregate doesn't care about keyed input (commutative aggregator or something)

My TMP partition function requires its input to be sorted by locus, but allows overlap between partitions.

How do I use a single non-optional allowedOverlap to express this versus a TMP function that doesn't care about keying/sorting at all?

@patrick-schultz
Copy link
Collaborator

How do I use a single non-optional allowedOverlap to express this versus a TMP function that doesn't care about keying/sorting at all?

Ah, I see. I think the answer is: you can't. You would need another integer parameter to say "I actually depend on this prefix of the key being sorted". It seems like allowedOverlap and requiredSortedPrefix are completely independent. In the single key case (for simplicity), you may or may not care if keys are localized in one partition, and you may or may not care if they're sorted. I don't see any connection.

@tpoterba tpoterba force-pushed the compiler-reorg-1 branch 2 times, most recently from d13e753 to 07b2564 Compare February 7, 2023 18:54
@tpoterba tpoterba force-pushed the compiler-reorg-1 branch 4 times, most recently from d394535 to 61922ba Compare February 17, 2023 22:06
@tpoterba tpoterba force-pushed the compiler-reorg-1 branch 2 times, most recently from c90e9de to 6f06ac2 Compare February 27, 2023 15:43
@@ -3834,7 +3834,7 @@ def _map_partitions(self, f):
raise ValueError('Table._map_partitions must preserve key fields')

body_ir = ir.Let('global', ir.Ref(globals_uid, self._global_type), body._ir)
return Table(ir.TableMapPartitions(self._tir, globals_uid, rows_uid, body_ir))
return Table(ir.TableMapPartitions(self._tir, globals_uid, rows_uid, body_ir, 0, len(self.key)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember the semantics of requested_key right, doesn't this need to assume the body might depend on the entire key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I think that's the safe assumption. We can add these as params to the method in the future.

Comment on lines 7 to 9
import is.hail.expr.ir.BaseIR
import is.hail.expr.ir.{BaseIR, LoweringAnalyses, TableIR}
import is.hail.expr.ir.functions.IRFunctionRegistry
import is.hail.expr.ir.lowering.{CanLowerEfficiently, DArrayLowering, LowerTableIR, TableStage}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these are unused, since there are no other changes?

}
}
bindIR(invoke("extend", TArray(TInt32), ToArray(mapIR(rangeIR(nPartitionsAdj)) { partIdx =>
invoke("ceil", TFloat64, partIdx.toD * numRowsRef.toD / nPartitionsAdj.toDouble).toI
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason this can't be (partIdx * numRowsRef) floorDiv nPartitionsAdj?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that works, didn't go the final step in simplifying after rewriting. 👍

@@ -69,7 +69,7 @@ class RVDPartitioner(
Some(Interval(rangeBounds.head.left, rangeBounds.last.right))

def satisfiesAllowedOverlap(testAllowedOverlap: Int): Boolean =
RVDPartitioner.isValid(sm, kType, rangeBounds, testAllowedOverlap)
(testAllowedOverlap >= kType.size) || RVDPartitioner.isValid(sm, kType, rangeBounds, testAllowedOverlap)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the guard into RVDPartitioner.isValid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good change.

@tpoterba tpoterba force-pushed the compiler-reorg-1 branch 2 times, most recently from d9f1d99 to a880a90 Compare March 14, 2023 16:21
…ion planning

Evaluation of relational lets is an explicit pass.

Executing and rewriting shuffles is an explicit pass.
 * lowerDistributedSort executes the shuffle and produces a TableReader

Higher-level passes that recursively lower and execute are parameterized
 by the contained pipeline.

fix EvalRelationalLets

fix some tests

update TMP and some CanLowerEfficiently fixes

fix

fixes

colocalized_key => allowed_overlap

address comments

TMP change again
@danking danking merged commit 87d3acd into hail-is:main Mar 15, 2023
Comment on lines 457 to 460
object RTuple {
def apply(fields: Seq[TypeWithRequiredness]): RTuple =
RTuple(Array.tabulate(fields.length)(i => RField(i.toString, fields(i), i)))
def apply(fields: Seq[(Int, TypeWithRequiredness)]): RTuple =
RTuple(fields.zipWithIndex.map { case ((fdIdx, typ), i) => RField(fdIdx.toString, typ, i) }.toIndexedSeq)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants