Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

✨ health check round 2 #392

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vllm/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ async def do_log_stats(
model_output: Optional[List[SamplerOutput]] = None,
) -> None:
pass

async def check_health(self) -> None:
"""Raise if unhealthy"""
2 changes: 2 additions & 0 deletions vllm/entrypoints/openai/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from vllm.sampling_params import SamplingParams

VLLM_RPC_SUCCESS_STR = "SUCCESS"
VLLM_RPC_HEALTHY_STR = "HEALTHY"


@dataclass
Expand All @@ -29,6 +30,7 @@ class RPCUtilityRequest(Enum):
IS_SERVER_READY = 1
GET_MODEL_CONFIG = 2
DO_LOG_STATS = 3
CHECK_HEALTH = 4


RPC_REQUEST_TYPE = Union[RPCGenerateRequest, RPCAbortRequest,
Expand Down
24 changes: 24 additions & 0 deletions vllm/entrypoints/openai/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from vllm.config import DecodingConfig, ModelConfig
from vllm.entrypoints.openai.rpc import (RPC_REQUEST_TYPE,
VLLM_RPC_HEALTHY_STR,
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCGenerateRequest, RPCUtilityRequest)
from vllm.inputs import PromptInputs
Expand Down Expand Up @@ -153,3 +154,26 @@ async def generate(

yield request_output
socket.close()

async def check_health(self) -> None:
"""Raise if unhealthy"""

# Connect to socket.
socket = self.context.socket(zmq.constants.DEALER)
socket.connect(self.path)

# Ping RPCServer with CHECK_HEALTH request.
await socket.send(pickle.dumps(RPCUtilityRequest.CHECK_HEALTH))

# Await the reply from the server.
# TODO: do we need an internal timeout here?
# Or do we expect the external probe to timeout and let this chill?
health_message = pickle.loads(await socket.recv())
socket.close()

if isinstance(health_message, Exception):
raise health_message

if health_message != VLLM_RPC_HEALTHY_STR:
raise ValueError("Expected healthy response from backend but got "
"f{health_message}")
16 changes: 15 additions & 1 deletion vllm/entrypoints/openai/rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from typing_extensions import Never

from vllm import AsyncEngineArgs, AsyncLLMEngine
from vllm.entrypoints.openai.rpc import (VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
from vllm.entrypoints.openai.rpc import (VLLM_RPC_HEALTHY_STR,
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCGenerateRequest, RPCUtilityRequest)
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
Expand Down Expand Up @@ -96,6 +97,17 @@ async def generate(self, identity, generate_request: RPCGenerateRequest):
self.socket.send_multipart(
[identity, pickle.dumps(e, pickle.HIGHEST_PROTOCOL)])

async def check_health(self, identity):
try:
await self.engine.check_health()
await self.socket.send_multipart([
identity,
pickle.dumps(VLLM_RPC_HEALTHY_STR, pickle.HIGHEST_PROTOCOL)
])
except Exception as e:
await self.socket.send_multipart(
[identity, pickle.dumps(e, pickle.HIGHEST_PROTOCOL)])

def _make_handler_coro(self, identity,
message) -> Coroutine[Any, Any, Never]:
"""Route the zmq message to the handler coroutine."""
Expand All @@ -115,6 +127,8 @@ def _make_handler_coro(self, identity,
return self.do_log_stats(identity)
elif request == RPCUtilityRequest.IS_SERVER_READY:
return self.is_server_ready(identity)
elif request == RPCUtilityRequest.CHECK_HEALTH:
return self.check_health(identity)
else:
raise ValueError(f"Unknown RPCUtilityRequest type: {request}")

Expand Down
Loading