Skip to content

Commit

Permalink
[FrontEnd] Make merge_async_iterators is_cancelled arg optional (v…
Browse files Browse the repository at this point in the history
  • Loading branch information
njhill authored and fialhocoelho committed Aug 22, 2024
1 parent 5af0dfb commit d3d01b1
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,29 +405,30 @@ async def iterate_with_cancellation(

async def merge_async_iterators(
*iterators: AsyncGenerator[T, None],
is_cancelled: Callable[[], Awaitable[bool]],
is_cancelled: Optional[Callable[[], Awaitable[bool]]] = None,
) -> AsyncGenerator[Tuple[int, T], None]:
"""Merge multiple asynchronous iterators into a single iterator.
This method handle the case where some iterators finish before others.
When it yields, it yields a tuple (i, item) where i is the index of the
iterator that yields the item.
It also polls the provided function at least once per second to check
for client cancellation.
It also optionally polls a provided function at least once per second
to check for client cancellation.
"""

# Can use anext() in python >= 3.10
awaits = {
ensure_future(pair[1].__anext__()): pair
for pair in enumerate(iterators)
}
timeout = None if is_cancelled is None else 1
try:
while awaits:
done, pending = await asyncio.wait(awaits.keys(),
return_when=FIRST_COMPLETED,
timeout=1)
if await is_cancelled():
timeout=timeout)
if is_cancelled is not None and await is_cancelled():
raise asyncio.CancelledError("client cancelled")
for d in done:
pair = awaits.pop(d)
Expand Down

0 comments on commit d3d01b1

Please sign in to comment.