From c7ea323256798feecffaff337129df32c82561c9 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 16:31:24 -0300 Subject: [PATCH 1/6] Fix frontend multiprocessing hang If the server dies, the frontend keeps waiting for it to come up for ever Signed-off-by: Max de Bayser --- vllm/entrypoints/openai/api_server.py | 9 ++++++++- vllm/entrypoints/openai/rpc/client.py | 15 ++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 88f0bd4ee4db..9d69002edd1a 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -116,7 +116,14 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]: # Build RPCClient, which conforms to AsyncEngineClient Protocol. async_engine_client = AsyncEngineRPCClient(port) - await async_engine_client.setup() + + while True: + try: + await async_engine_client.setup() + break + except TimeoutError as e: + if not rpc_server_process.is_alive(): + raise RuntimeError("Server crashed") from e try: yield async_engine_client diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index 45bf88b5bf57..49f55608766c 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -18,6 +18,9 @@ from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs +# Time to wait before checking it the server process is alive. +SERVER_START_TIMEOUT = 1000 + class AsyncEngineRPCClient: @@ -85,14 +88,19 @@ async def _send_get_data_rpc_request(self, request: RPCUtilityRequest, return data - async def _send_one_way_rpc_request(self, request: RPC_REQUEST_TYPE, - error_message: str): + async def _send_one_way_rpc_request(self, + request: RPC_REQUEST_TYPE, + error_message: str, + timeout: int = 0): """Send one-way RPC request to trigger an action.""" with self.socket() as socket: # Ping RPC Server with request. await socket.send(cloudpickle.dumps(request)) # Await acknowledgement from RPCServer. + if timeout > 0 and await socket.poll(timeout) == 0: + raise TimeoutError(f"server didn't reply within {timeout} ms") + response = cloudpickle.loads(await socket.recv()) if not isinstance(response, str) or response != VLLM_RPC_SUCCESS_STR: @@ -117,7 +125,8 @@ async def wait_for_server(self): await self._send_one_way_rpc_request( request=RPCUtilityRequest.IS_SERVER_READY, - error_message="Unable to start RPC Server.") + error_message="Unable to start RPC Server.", + timeout=SERVER_START_TIMEOUT) async def _get_model_config_rpc(self) -> ModelConfig: """Get the ModelConfig object from the RPC Server""" From c120770323d1516c6bb152891f662171429fcfd3 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 21:21:29 -0300 Subject: [PATCH 2/6] address review comments Signed-off-by: Max de Bayser --- vllm/entrypoints/openai/api_server.py | 16 ++++++++-------- vllm/entrypoints/openai/rpc/client.py | 8 +++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 9d69002edd1a..d6cead9c7a30 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -117,15 +117,15 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]: # Build RPCClient, which conforms to AsyncEngineClient Protocol. async_engine_client = AsyncEngineRPCClient(port) - while True: - try: - await async_engine_client.setup() - break - except TimeoutError as e: - if not rpc_server_process.is_alive(): - raise RuntimeError("Server crashed") from e - try: + while True: + try: + await async_engine_client.setup() + break + except TimeoutError as e: + if not rpc_server_process.is_alive(): + raise RuntimeError("Server crashed") from e + yield async_engine_client finally: # Ensure rpc server process was terminated diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index 49f55608766c..cb935898f691 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -64,7 +64,9 @@ def socket(self): socket.connect(self.path) yield socket finally: - socket.close() + # linger == 0 means discard unsent messages + # when the socket is closed + socket.close(linger=0) async def _send_get_data_rpc_request(self, request: RPCUtilityRequest, expected_type: Any, @@ -91,14 +93,14 @@ async def _send_get_data_rpc_request(self, request: RPCUtilityRequest, async def _send_one_way_rpc_request(self, request: RPC_REQUEST_TYPE, error_message: str, - timeout: int = 0): + timeout: Optional[int] = None): """Send one-way RPC request to trigger an action.""" with self.socket() as socket: # Ping RPC Server with request. await socket.send(cloudpickle.dumps(request)) # Await acknowledgement from RPCServer. - if timeout > 0 and await socket.poll(timeout) == 0: + if timeout is not None and await socket.poll(timeout=timeout) == 0: raise TimeoutError(f"server didn't reply within {timeout} ms") response = cloudpickle.loads(await socket.recv()) From 33e503b9c56180263bdbf1a92bec4faa7a561067 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 22:00:52 -0300 Subject: [PATCH 3/6] improve exception message Signed-off-by: Max de Bayser --- vllm/entrypoints/openai/api_server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index d6cead9c7a30..93960a96ba16 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -124,7 +124,9 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]: break except TimeoutError as e: if not rpc_server_process.is_alive(): - raise RuntimeError("Server crashed") from e + raise RuntimeError( + "The server process died before " + "responding to the readiness probe") from e yield async_engine_client finally: From fbefbe0a250c70650916d7867d6fb893ab79862d Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 22:05:53 -0300 Subject: [PATCH 4/6] add unit to constant name Signed-off-by: Max de Bayser --- vllm/entrypoints/openai/rpc/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index cb935898f691..529163922884 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -19,7 +19,7 @@ from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs # Time to wait before checking it the server process is alive. -SERVER_START_TIMEOUT = 1000 +SERVER_START_TIMEOUT_MS = 1000 class AsyncEngineRPCClient: @@ -128,7 +128,7 @@ async def wait_for_server(self): await self._send_one_way_rpc_request( request=RPCUtilityRequest.IS_SERVER_READY, error_message="Unable to start RPC Server.", - timeout=SERVER_START_TIMEOUT) + timeout=SERVER_START_TIMEOUT_MS) async def _get_model_config_rpc(self) -> ModelConfig: """Get the ModelConfig object from the RPC Server""" From 72c4fb769ad1c024c28f5bb14d5564ae8eabcf38 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 22:11:15 -0300 Subject: [PATCH 5/6] Expand comment about zmq socket options Signed-off-by: Max de Bayser --- vllm/entrypoints/openai/rpc/client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index 529163922884..4d3610191b36 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -65,7 +65,14 @@ def socket(self): yield socket finally: # linger == 0 means discard unsent messages - # when the socket is closed + # when the socket is closed. This is necessary + # because otherwise self.context.destroy() will + # wait for 30 seconds until unsent messages are + # received, which is impossible if the server + # crashed. In the absence of a server crash we + # always expect a response before closing the + # socket anyway. + # Reference: http://api.zeromq.org/4-2:zmq-setsockopt#toc24 socket.close(linger=0) async def _send_get_data_rpc_request(self, request: RPCUtilityRequest, From 6283c6642b09edec20f06da27be8dc03907ff79e Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 6 Aug 2024 23:02:37 -0300 Subject: [PATCH 6/6] add unit test Signed-off-by: Max de Bayser --- tests/entrypoints/openai/test_mp_crash.py | 35 +++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/entrypoints/openai/test_mp_crash.py diff --git a/tests/entrypoints/openai/test_mp_crash.py b/tests/entrypoints/openai/test_mp_crash.py new file mode 100644 index 000000000000..7dc595a7be35 --- /dev/null +++ b/tests/entrypoints/openai/test_mp_crash.py @@ -0,0 +1,35 @@ +from typing import Any + +import pytest + +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.entrypoints.openai.api_server import build_async_engine_client +from vllm.entrypoints.openai.cli_args import make_arg_parser +from vllm.utils import FlexibleArgumentParser + + +def crashing_from_engine_args( + cls, + engine_args: Any = None, + start_engine_loop: Any = None, + usage_context: Any = None, + stat_loggers: Any = None, +) -> "AsyncLLMEngine": + raise Exception("foo") + + +@pytest.mark.asyncio +async def test_mp_crash_detection(monkeypatch): + + with pytest.raises(RuntimeError) as excinfo, monkeypatch.context() as m: + m.setattr(AsyncLLMEngine, "from_engine_args", + crashing_from_engine_args) + parser = FlexibleArgumentParser( + description="vLLM's remote OpenAI server.") + parser = make_arg_parser(parser) + args = parser.parse_args([]) + + async with build_async_engine_client(args): + pass + assert "The server process died before responding to the readiness probe"\ + in str(excinfo.value)