Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ BugFix ] Move zmq frontend to IPC instead of TCP #7222

Merged
merged 9 commits into from
Aug 7, 2024
8 changes: 4 additions & 4 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
EmbeddingRequest, ErrorResponse,
TokenizeRequest,
TokenizeResponse)
from vllm.entrypoints.openai.rpc import VLLM_RPC_PATH
from vllm.entrypoints.openai.rpc.client import AsyncEngineRPCClient
from vllm.entrypoints.openai.rpc.server import run_rpc_server
# yapf: enable
Expand All @@ -43,7 +44,7 @@
OpenAIServingTokenization)
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import FlexibleArgumentParser, get_open_port
from vllm.utils import FlexibleArgumentParser
from vllm.version import __version__ as VLLM_VERSION

TIMEOUT_KEEP_ALIVE = 5 # seconds
Expand Down Expand Up @@ -107,15 +108,14 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]:
# Otherwise, use the multiprocessing AsyncLLMEngine.
else:
# Start RPCServer in separate process (holds the AsyncLLMEngine).
port = get_open_port(envs.VLLM_RPC_PORT)
rpc_server_process = Process(target=run_rpc_server,
args=(engine_args,
UsageContext.OPENAI_API_SERVER,
port))
VLLM_RPC_PATH))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add some kind of unique/random suffix here in case more than one instance of vLLM is run at a time.

rpc_server_process.start()

# Build RPCClient, which conforms to AsyncEngineClient Protocol.
async_engine_client = AsyncEngineRPCClient(port)
async_engine_client = AsyncEngineRPCClient(VLLM_RPC_PATH)
await async_engine_client.setup()

try:
Expand Down
1 change: 1 addition & 0 deletions vllm/entrypoints/openai/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

VLLM_RPC_SUCCESS_STR = "SUCCESS"
VLLM_RPC_HEALTHY_STR = "HEALTHY"
VLLM_RPC_PATH = "ipc:///tmp/vllm_rpc_path"
robertgshaw2-neuralmagic marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
Expand Down
6 changes: 3 additions & 3 deletions vllm/entrypoints/openai/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

class AsyncEngineRPCClient:

def __init__(self, port: int):
def __init__(self, rpc_path: str):
self.context = zmq.asyncio.Context()
self.path = f"tcp://localhost:{port}"
self.rpc_path = rpc_path

async def setup(self):
"""Setup the client before it starts sending server requests."""
Expand Down Expand Up @@ -58,7 +58,7 @@ def socket(self):
# to enable streaming.
socket = self.context.socket(zmq.constants.DEALER)
try:
socket.connect(self.path)
socket.connect(self.rpc_path)
yield socket
finally:
socket.close()
Expand Down
10 changes: 4 additions & 6 deletions vllm/entrypoints/openai/rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class AsyncEngineRPCServer:

def __init__(self, async_engine_args: AsyncEngineArgs,
usage_context: UsageContext, port: int):
usage_context: UsageContext, rpc_path: str):
# Initialize engine first.
self.engine = AsyncLLMEngine.from_engine_args(async_engine_args,
usage_context)
Expand All @@ -30,9 +30,7 @@ def __init__(self, async_engine_args: AsyncEngineArgs,

# Init socket for readiness state.
self.socket = self.context.socket(zmq.constants.ROUTER)
# Note numeric form of localhost should be used for zmq bind(),
# see https://stackoverflow.com/a/8958414
self.socket.bind(f"tcp://127.0.0.1:{port}")
self.socket.bind(rpc_path)
youkaichao marked this conversation as resolved.
Show resolved Hide resolved

def cleanup(self):
"""Cleanup all resources."""
Expand Down Expand Up @@ -213,6 +211,6 @@ def signal_handler() -> None:


def run_rpc_server(async_engine_args: AsyncEngineArgs,
usage_context: UsageContext, port: int):
server = AsyncEngineRPCServer(async_engine_args, usage_context, port)
usage_context: UsageContext, rpc_path: str):
server = AsyncEngineRPCServer(async_engine_args, usage_context, rpc_path)
asyncio.run(run_server(server))
6 changes: 0 additions & 6 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
if TYPE_CHECKING:
VLLM_HOST_IP: str = ""
VLLM_PORT: Optional[int] = None
VLLM_RPC_PORT: int = 5570
VLLM_USE_MODELSCOPE: bool = False
VLLM_RINGBUFFER_WARNING_INTERVAL: int = 60
VLLM_INSTANCE_ID: Optional[str] = None
Expand Down Expand Up @@ -142,11 +141,6 @@ def get_default_config_root():
lambda: int(os.getenv('VLLM_PORT', '0'))
if 'VLLM_PORT' in os.environ else None,

# used when the frontend api server is running in multi-processing mode,
# to communicate with the backend engine process over ZMQ.
'VLLM_RPC_PORT':
lambda: int(os.getenv('VLLM_RPC_PORT', '5570')),

# If true, will load models from ModelScope instead of Hugging Face Hub.
# note that the value is true or false, not numbers
"VLLM_USE_MODELSCOPE":
Expand Down
6 changes: 2 additions & 4 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,8 @@ def get_distributed_init_method(ip: str, port: int) -> str:
return f"tcp://[{ip}]:{port}" if ":" in ip else f"tcp://{ip}:{port}"


def get_open_port(port: Optional[int] = None) -> int:
if port is None:
# Default behavior here is to return a port for multi-gpu communication
port = envs.VLLM_PORT
def get_open_port() -> int:
port = envs.VLLM_PORT
if port is not None:
while True:
try:
Expand Down
Loading