Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
njhill committed Aug 3, 2024
1 parent 105e3c3 commit a8d7855
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
9 changes: 5 additions & 4 deletions tests/async_engine/api_server_async_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""vllm.entrypoints.api_server with some extra logging for testing."""
from typing import Any, Dict
from typing import Any, Dict, Iterable

import uvicorn
from fastapi.responses import JSONResponse, Response
Expand All @@ -18,9 +18,10 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._num_aborts = 0

async def abort(self, request_id: str) -> None:
await super().abort(request_id)
self._num_aborts += 1
async def _engine_abort(self, request_ids: Iterable[str]):
ids = list(request_ids)
self._num_aborts += len(ids)
await super()._engine_abort(ids)

def testing_stats(self) -> Dict[str, Any]:
return {"num_aborted_requests": self._num_aborts}
Expand Down
26 changes: 13 additions & 13 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def put(self, item: Union[RequestOutput, EmbeddingRequestOutput,
self._queue.put_nowait(item)

def finish(self, cancelled: bool = False) -> None:
self._finished = True
self._queue.put_nowait(
asyncio.CancelledError if cancelled else STOP_ITERATION)
if not self._finished:
self._finished = True
self._queue.put_nowait(
asyncio.CancelledError if cancelled else STOP_ITERATION)

@property
def finished(self) -> bool:
Expand Down Expand Up @@ -150,7 +151,7 @@ def process_request_output(self,
if request_output.finished:
if verbose:
logger.info("Finished request %s.", request_id)
self.abort_request(request_id)
stream.finish()

def process_exception(self,
request_id: str,
Expand Down Expand Up @@ -198,12 +199,9 @@ def abort_request(self,

self._finished_requests.put_nowait(request_id)

if request_id not in self._request_streams or self._request_streams[
request_id].finished:
# The request has already finished or been aborted.
return

self._request_streams[request_id].finish(cancelled=cancelled)
stream = self._request_streams.get(request_id, None)
if stream is not None:
stream.finish(cancelled=cancelled)

def get_new_and_finished_requests(self) -> Tuple[List[Dict], Set[str]]:
"""Get the new requests and finished requests to be
Expand Down Expand Up @@ -682,7 +680,9 @@ async def run_engine_loop(self):
raise
await asyncio.sleep(0)

def add_request(
# This method does not need to be async, but kept that way
# for backwards compatibility.
async def add_request(
self,
request_id: str,
inputs: PromptInputs,
Expand Down Expand Up @@ -787,7 +787,7 @@ async def generate(
>>> # Process and return the final output
>>> ...
"""
async for output in self.add_request(
async for output in await self.add_request(
request_id,
inputs,
sampling_params,
Expand Down Expand Up @@ -865,7 +865,7 @@ async def encode(
>>> # Process and return the final output
>>> ...
"""
async for output in self.add_request(
async for output in await self.add_request(
request_id,
inputs,
pooling_params,
Expand Down

0 comments on commit a8d7855

Please sign in to comment.