Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Dec 11, 2024
1 parent 25a31e9 commit 8ab38ea
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ def reduce(
metadatas = []
stage_id = next(stage_id_counter)

# Process all steps in the fanout plan
# Dispatch all fanouts.
for step in fanout_plan:
# Check any completed materializations, collect their partitions and metadatas, and add to reduce inputs
newly_completed = [(i, m) for i, m in pending_materializations.items() if m.done()]
Expand All @@ -1534,7 +1534,8 @@ def reduce(
else:
yield step

# After all fanouts are launched, wait for remaining materializations
# All fanouts dispatched. Wait for all of them to materialize
# (since we need all of them to emit even a single reduce).
while pending_materializations:
newly_completed = [(i, m) for i, m in pending_materializations.items() if m.done()]
for i, completed in newly_completed:
Expand Down

0 comments on commit 8ab38ea

Please sign in to comment.