From 1c37a2c6fd5b314c352a202a63a3ffc53aedf538 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 20 Sep 2024 05:12:55 +0000 Subject: [PATCH] [Misc] ray queue --- .github/workflows/bench_test.yml | 3 -- .github/workflows/e2e_test.yml | 3 -- .github/workflows/migration_test.yml | 3 -- .github/workflows/offline_inference.yml | 3 -- .github/workflows/pylint.yml | 3 -- .github/workflows/unit_test.yml | 3 -- .github/workflows/{whl.yml => whl_build.yml} | 3 -- configs/base.yml | 5 ++- llumnix/backends/utils.py | 7 +-- llumnix/backends/vllm/llm_engine.py | 21 ++++++--- llumnix/backends/vllm/simulator.py | 9 +++- llumnix/config/default.py | 2 + llumnix/entrypoints/llumnix_utils.py | 27 ++++-------- llumnix/entrypoints/vllm/api_server.py | 16 ++++--- llumnix/llm_engine_manager.py | 6 +-- llumnix/llumlet/llumlet.py | 6 ++- .../rpc => llumnix/output_queue}/__init__.py | 0 .../output_queue/output_queue_client_base.py | 22 ++++++++++ .../output_queue/output_queue_server_base.py | 27 ++++++++++++ llumnix/output_queue/ray_queue_client.py | 23 ++++++++++ llumnix/output_queue/ray_queue_server.py | 44 +++++++++++++++++++ llumnix/output_queue/utils.py | 26 +++++++++++ .../zmq_client.py} | 9 ++-- .../zmq_server.py} | 4 +- .../utils.py => output_queue/zmq_utils.py} | 1 - llumnix/server_info.py | 6 +++ .../backends/vllm/test_llm_engine.py | 9 ++-- tests/unit_test/output_queue/__init__.py | 12 +++++ .../test_zmq.py} | 14 +++--- 29 files changed, 238 insertions(+), 79 deletions(-) rename .github/workflows/{whl.yml => whl_build.yml} (93%) rename {tests/unit_test/rpc => llumnix/output_queue}/__init__.py (100%) create mode 100644 llumnix/output_queue/output_queue_client_base.py create mode 100644 llumnix/output_queue/output_queue_server_base.py create mode 100644 llumnix/output_queue/ray_queue_client.py create mode 100644 llumnix/output_queue/ray_queue_server.py create mode 100644 llumnix/output_queue/utils.py rename llumnix/{rpc/queue_client.py => output_queue/zmq_client.py} (94%) rename llumnix/{rpc/queue_server.py => output_queue/zmq_server.py} (97%) rename llumnix/{rpc/utils.py => output_queue/zmq_utils.py} (99%) create mode 100644 tests/unit_test/output_queue/__init__.py rename tests/unit_test/{rpc/test_queue.py => output_queue/test_zmq.py} (91%) diff --git a/.github/workflows/bench_test.yml b/.github/workflows/bench_test.yml index b5ec057..6cef48c 100644 --- a/.github/workflows/bench_test.yml +++ b/.github/workflows/bench_test.yml @@ -1,9 +1,6 @@ name: bench_test on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/e2e_test.yml b/.github/workflows/e2e_test.yml index 9ef0599..4370d4e 100644 --- a/.github/workflows/e2e_test.yml +++ b/.github/workflows/e2e_test.yml @@ -1,9 +1,6 @@ name: e2e_test on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/migration_test.yml b/.github/workflows/migration_test.yml index 92e03af..bb6cd3c 100644 --- a/.github/workflows/migration_test.yml +++ b/.github/workflows/migration_test.yml @@ -1,9 +1,6 @@ name: migration_test on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/offline_inference.yml b/.github/workflows/offline_inference.yml index 8b3c62e..2d2501b 100644 --- a/.github/workflows/offline_inference.yml +++ b/.github/workflows/offline_inference.yml @@ -1,9 +1,6 @@ name: offline_inference on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index a3f123a..e81bdf1 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -1,9 +1,6 @@ name: pylint on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/unit_test.yml b/.github/workflows/unit_test.yml index e554038..864989c 100644 --- a/.github/workflows/unit_test.yml +++ b/.github/workflows/unit_test.yml @@ -1,9 +1,6 @@ name: unit_test on: - push: - branches: - - main pull_request: branches: - main diff --git a/.github/workflows/whl.yml b/.github/workflows/whl_build.yml similarity index 93% rename from .github/workflows/whl.yml rename to .github/workflows/whl_build.yml index f4cba19..ced47c3 100644 --- a/.github/workflows/whl.yml +++ b/.github/workflows/whl_build.yml @@ -1,9 +1,6 @@ name: whl_build on: - push: - branches: - - main pull_request: branches: - main diff --git a/configs/base.yml b/configs/base.yml index b06ce79..7685e5d 100644 --- a/configs/base.yml +++ b/configs/base.yml @@ -1,14 +1,15 @@ SERVER: HOST: '127.0.0.1' PORT: 37000 + QUEUE_TYPE: "rayqueue" RAY: RAY_CLUSTER_PORT: 30037 LAUNCH_RAY_CLUSTER: True MANAGER: - DISABLE_FIXED_NODE_INIT_INSTANCE: False - DISABLE_INIT_INSTANCE_BY_MANAGER: False + DISABLE_FIXED_NODE_INIT_INSTANCE: True + DISABLE_INIT_INSTANCE_BY_MANAGER: True INITIAL_INSTANCES: 1 diff --git a/llumnix/backends/utils.py b/llumnix/backends/utils.py index 1e110f4..48e3142 100644 --- a/llumnix/backends/utils.py +++ b/llumnix/backends/utils.py @@ -19,15 +19,16 @@ from llumnix.backends.backend_interface import BackendInterface, BackendType -def init_backend_engine(instance_id: str, backend_type: BackendType, *args, **kwargs) -> BackendInterface: +def init_backend_engine(instance_id: str, output_queue_type: str, + backend_type: BackendType, *args, **kwargs) -> BackendInterface: if backend_type == BackendType.VLLM: # pylint: disable=import-outside-toplevel from llumnix.backends.vllm.llm_engine import BackendVLLM - backend_engine = BackendVLLM(instance_id, *args, **kwargs) + backend_engine = BackendVLLM(instance_id, output_queue_type, *args, **kwargs) elif backend_type == BackendType.SIM_VLLM: # pylint: disable=import-outside-toplevel from llumnix.backends.vllm.simulator import BackendSimVLLM - backend_engine = BackendSimVLLM(instance_id, *args, **kwargs) + backend_engine = BackendSimVLLM(instance_id, output_queue_type, *args, **kwargs) else: raise ValueError(f'Unsupported backend: {backend_type}') return backend_engine diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index d343f02..dca5cd5 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -35,16 +35,23 @@ from llumnix.backends.profiling import LatencyMemData from llumnix.server_info import ServerInfo from llumnix.internal_config import MigrationConfig -from llumnix.rpc.queue_client import QueueClient +from llumnix.output_queue.output_queue_client_base import OutputQueueClientBase +from llumnix.output_queue.ray_queue_client import RayQueueClient +from llumnix.output_queue.zmq_client import ZmqClient logger = init_logger(__name__) class AsyncPutQueueThread(threading.Thread): - def __init__(self, instance_id): + def __init__(self, instance_id, output_queue_type): super().__init__() self.instance_id = instance_id - self.request_output_queue_client = QueueClient() + + self.request_output_queue_client: OutputQueueClientBase = None + if output_queue_type == "rayqueue": + self.request_output_queue_client = RayQueueClient() + else: + self.request_output_queue_client = ZmqClient() self.engine_actor_handle = None self.loop = asyncio.new_event_loop() self.daemon = True @@ -82,13 +89,13 @@ def put_nowait_batch_to_servers(self, class LLMEngineLlumnix(LLMEngine): - def __init__(self, instance_id: str, *arg, **kwargs) -> None: + def __init__(self, instance_id: str, output_queue_type: str, *arg, **kwargs) -> None: super().__init__(*arg, **kwargs) self.instance_id = instance_id self.step_counter = Counter() self.instance_info = None # TODO(s5u13b): Reduce the overhead. - self.async_put_queue_thread = AsyncPutQueueThread(instance_id) + self.async_put_queue_thread = AsyncPutQueueThread(instance_id, output_queue_type) self.async_put_queue_thread.start() # pylint: disable=W0221 @@ -96,6 +103,7 @@ def __init__(self, instance_id: str, *arg, **kwargs) -> None: def from_engine_args( cls, engine_args: EngineArgs, + output_queue_type: str, migration_config: MigrationConfig, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, instance_id: str = None, @@ -124,6 +132,7 @@ def from_engine_args( # Create the LLM engine. engine = cls( instance_id=instance_id, + output_queue_type=output_queue_type, **engine_config.to_dict(), executor_class=executor_class, log_stats=not engine_args.disable_log_stats, @@ -215,12 +224,14 @@ class BackendVLLM(BackendInterface): def __init__( self, instance_id: str, + output_queue_type: str, migration_config: MigrationConfig, engine_args: EngineArgs, placement_group: PlacementGroup = None, node_id: str = None ) -> None: self.engine: LLMEngineLlumnix = LLMEngineLlumnix.from_engine_args(engine_args=engine_args, + output_queue_type=output_queue_type, migration_config=migration_config, instance_id=instance_id, placement_group=placement_group, diff --git a/llumnix/backends/vllm/simulator.py b/llumnix/backends/vllm/simulator.py index 061c517..c0cf873 100644 --- a/llumnix/backends/vllm/simulator.py +++ b/llumnix/backends/vllm/simulator.py @@ -13,6 +13,7 @@ import os from typing import List +import ray.actor from vllm.utils import Counter from vllm.engine.arg_utils import EngineArgs @@ -31,6 +32,7 @@ class BackendSimVLLM(BackendVLLM): def __init__( self, instance_id: int, + output_queue_type: str, migration_config: MigrationConfig, profiling_result_file_path: str, gpu_type: str, @@ -54,12 +56,15 @@ def __init__( latency_mem: LatencyMemData = profiling_result.para_dict[sim_parallel_config] self.engine: LLMEngineLlumnix = LLMEngineLlumnix.from_engine_args(migration_config=migration_config, - latency_mem=latency_mem, engine_args=engine_args) + output_queue_type=output_queue_type, + latency_mem=latency_mem, + engine_args=engine_args) self.engine.scheduler = SchedulerLlumnix(self.engine.scheduler_config, self.engine.cache_config, self.engine.lora_config) self.engine.output_processor.scheduler = self.engine.scheduler self.migration_config = migration_config self.instance_id = instance_id self.step_counter = Counter() - def send_blocks(self, dst_ray_actor: "ray.actor.ActorHandle", src_blocks: List[int], dst_blocks: List[int]) -> None: + # pylint: disable=unused-argument + def send_blocks(self, dst_ray_actor: ray.actor.ActorHandle, src_blocks: List[int], dst_blocks: List[int]) -> None: self.engine.model_executor.send_blocks(len(src_blocks)) diff --git a/llumnix/config/default.py b/llumnix/config/default.py index 9b1d531..dfa853f 100644 --- a/llumnix/config/default.py +++ b/llumnix/config/default.py @@ -26,6 +26,8 @@ _C.SERVER.HOST = "localhost" # Port number for the server _C.SERVER.PORT = 8000 +# Queue type for request output queue +_C.SERVER.QUEUE_TYPE = "rayqueue" # Port number for the request output queue _C.SERVER.REQUEST_OUTPUT_QUEUE_PORT = 1234 # Path to SSL key file for secure connections diff --git a/llumnix/entrypoints/llumnix_utils.py b/llumnix/entrypoints/llumnix_utils.py index 52d2d10..33dd02c 100644 --- a/llumnix/entrypoints/llumnix_utils.py +++ b/llumnix/entrypoints/llumnix_utils.py @@ -26,10 +26,6 @@ from llumnix.logger import init_logger from llumnix.utils import random_uuid from llumnix.arg_utils import EngineManagerArgs -from llumnix.rpc.utils import get_open_zmq_ipc_path -from llumnix.server_info import ServerInfo -from llumnix.rpc.queue_server import QueueServer - logger = init_logger(__name__) @@ -131,9 +127,8 @@ def init_manager(engine_manager_args: EngineManagerArgs) -> LLMEngineManager: logger.info("Get existing LLMEngineManager") return engine_manager -def init_llumlets(engine_manager_args: EngineManagerArgs, - engine_args, - node_id: str) -> Tuple[List[str], List[Llumlet]]: +def init_llumlets(engine_manager_args: EngineManagerArgs, engine_args, node_id: str, + output_queue_type: str) -> Tuple[List[str], List[Llumlet]]: engine_config = engine_args.create_engine_config() parallel_config = engine_config.parallel_config instance_ids: List[str] = [] @@ -146,6 +141,7 @@ def init_llumlets(engine_manager_args: EngineManagerArgs, instance_id = instance_ids[idx] if not engine_manager_args.profiling_result_file_path: llumlet = Llumlet.from_args( + output_queue_type, engine_manager_args.disable_fixed_node_init_instance, False, node_id, @@ -157,6 +153,7 @@ def init_llumlets(engine_manager_args: EngineManagerArgs, ) else: llumlet = Llumlet.from_args( + output_queue_type, engine_manager_args.disable_fixed_node_init_instance, False, node_id, @@ -171,22 +168,16 @@ def init_llumlets(engine_manager_args: EngineManagerArgs, llumlets.append(llumlet) return instance_ids, llumlets -def init_request_output_queue(server_info: ServerInfo) -> QueueServer: - rpc_path = get_open_zmq_ipc_path(server_info.request_output_queue_ip, server_info.request_output_queue_port) - request_output_queue = QueueServer(rpc_path) - return request_output_queue - def init_llumnix_components(engine_manager_args: EngineManagerArgs, engine_args, node_id: str, - server_info: ServerInfo) -> Tuple[LLMEngineManager, List[Llumlet], QueueServer]: - request_output_queue = init_request_output_queue(server_info) - + output_queue_type: str): engine_manager = init_manager(engine_manager_args) if engine_manager_args.disable_init_instance_by_manager: - instance_ids, llumlets = init_llumlets(engine_manager_args, engine_args, node_id) + instance_ids, llumlets = init_llumlets(engine_manager_args, engine_args, node_id, output_queue_type) else: - instance_ids, llumlets = retry_manager_method_sync(engine_manager.init_llumlets.remote, 'init_llumlets', engine_args, node_id) + instance_ids, llumlets = retry_manager_method_sync( + engine_manager.init_llumlets.remote, 'init_llumlets', engine_args, node_id, output_queue_type) available_instance_ids = [] dead_instance_ids = [] @@ -211,4 +202,4 @@ def init_llumnix_components(engine_manager_args: EngineManagerArgs, logger.info("Init Llumnix components done, {} instances are ready, instance_ids: {}." .format(len(available_instance_ids), available_instance_ids)) - return engine_manager, available_instance_ids, available_llumlets, request_output_queue + return engine_manager, available_instance_ids, available_llumlets diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index bee8b0d..61a851c 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -34,7 +34,8 @@ from llumnix.logger import init_logger from llumnix.utils import random_uuid from llumnix.backends.vllm.utils import check_engine_args -from llumnix.rpc.queue_server import QueueServer +from llumnix.output_queue.output_queue_server_base import OutputQueueServerBase +from llumnix.output_queue.utils import get_output_queue from llumnix.config import get_llumnix_config, LlumnixConfig logger = init_logger("llumnix.api_server") @@ -43,7 +44,7 @@ instances = {} instance_num_requests: Dict[str, int] = {} # request_output_queue could be None if initialzed in lifespan. -request_output_queue: QueueServer = None +request_output_queue: OutputQueueServerBase = None server_info = None TIMEOUT_KEEP_ALIVE = 5 # seconds. request_streams: Dict[str, AsyncStream] = {} @@ -250,7 +251,8 @@ def add_argument(self, *args, **kwargs): parser.add_argument('--disable-log-requests-server', action='store_true', help='disable logging requests in server') parser.add_argument("--ray-cluster-port", type=int) parser.add_argument('--launch-ray-cluster', action='store_true', help='if launch ray cluster in api server') - parser.add_argument("--request-output-queue-port", type=int) + parser.add_argument("--queue-type", type=str, choices=['rayqueue', 'zmq'], help='queue type for request output queue') + parser.add_argument("--request-output-queue-port", type=int, help='port for zeromq') parser.add_argument("--config-file", help="path to config file") parser = EngineManagerArgs.add_cli_args(parser) @@ -278,10 +280,12 @@ def add_argument(self, *args, **kwargs): # Launch the Llumnix componets on current node. server_id = random_uuid() ip = get_ip_address() - server_info = ServerInfo(server_id, ip, cfg.SERVER.REQUEST_OUTPUT_QUEUE_PORT) node_id = ray.get_runtime_context().get_node_id() - engine_manager, instance_ids, llumlets, request_output_queue = \ - init_llumnix_components(engine_manager_args, engine_args, node_id, server_info) + engine_manager, instance_ids, llumlets = \ + init_llumnix_components(engine_manager_args, engine_args, node_id, cfg.SERVER.QUEUE_TYPE) + request_output_queue = get_output_queue(ip, cfg.SERVER.REQUEST_OUTPUT_QUEUE_PORT, cfg.SERVER.QUEUE_TYPE) + server_info = ServerInfo(server_id, cfg.SERVER.QUEUE_TYPE, request_output_queue, ip, + cfg.SERVER.REQUEST_OUTPUT_QUEUE_PORT) for idx, ins_id in enumerate(instance_ids): instances[ins_id] = llumlets[idx] diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 90a50a2..16b5590 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -436,9 +436,7 @@ def from_args(cls, # TODO(s5u13b): Significant duplication with llumlet_utils.init_llumlets. Consider reducing duplicate codes. # TODO(s5u13b): Fix the logger when enabling init instance by manager. - def init_llumlets(self, - engine_args, - node_id: str) -> Tuple[List[str], List[Llumlet]]: + def init_llumlets(self, engine_args, node_id: str, output_queue_type: str) -> Tuple[List[str], List[Llumlet]]: engine_manager_args = self.engine_manager_args engine_config = engine_args.create_engine_config() parallel_config = engine_config.parallel_config @@ -448,6 +446,7 @@ def init_llumlets(self, instance_id = random_uuid() if not engine_manager_args.profiling_result_file_path: llumlet = Llumlet.from_args( + output_queue_type, engine_manager_args.disable_fixed_node_init_instance, True, node_id, @@ -459,6 +458,7 @@ def init_llumlets(self, ) else: llumlet = Llumlet.from_args( + output_queue_type, engine_manager_args.disable_fixed_node_init_instance, True, node_id, diff --git a/llumnix/llumlet/llumlet.py b/llumnix/llumlet/llumlet.py index 5ad3963..64feb5e 100644 --- a/llumnix/llumlet/llumlet.py +++ b/llumnix/llumlet/llumlet.py @@ -30,8 +30,10 @@ class Llumlet: + # TODO(KuilongCui): catch the exception generated in ctor def __init__(self, instance_id: str, + output_queue_type: str, backend_type: BackendType, migration_config: MigrationConfig, *args, @@ -39,6 +41,7 @@ def __init__(self, self.instance_id = instance_id self.actor_name = f"instance_{instance_id}" self.backend_engine: BackendInterface = init_backend_engine(self.instance_id, + output_queue_type, backend_type, migration_config, *args, @@ -52,6 +55,7 @@ def __init__(self, @classmethod def from_args(cls, + output_queue_type: str, disable_fixed_node_init_instance: bool, detached: bool, node_id: str, @@ -95,7 +99,7 @@ def from_args(cls, scheduling_strategy=NodeAffinitySchedulingStrategy( node_id=node_id, soft=False,)) - llumlet = engine_class.remote(instance_id, backend_type, migration_config, *args, **kwargs) + llumlet = engine_class.remote(instance_id, output_queue_type, backend_type, migration_config, *args, **kwargs) return llumlet def migrate_out(self, dst_instance_name: str) -> List[str]: diff --git a/tests/unit_test/rpc/__init__.py b/llumnix/output_queue/__init__.py similarity index 100% rename from tests/unit_test/rpc/__init__.py rename to llumnix/output_queue/__init__.py diff --git a/llumnix/output_queue/output_queue_client_base.py b/llumnix/output_queue/output_queue_client_base.py new file mode 100644 index 0000000..49147de --- /dev/null +++ b/llumnix/output_queue/output_queue_client_base.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from collections.abc import Iterable + +from llumnix.server_info import ServerInfo + +class OutputQueueClientBase(ABC): + @abstractmethod + async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo): + raise NotImplementedError diff --git a/llumnix/output_queue/output_queue_server_base.py b/llumnix/output_queue/output_queue_server_base.py new file mode 100644 index 0000000..343f35c --- /dev/null +++ b/llumnix/output_queue/output_queue_server_base.py @@ -0,0 +1,27 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod + +class OutputQueueServerBase(ABC): + @abstractmethod + async def get(self): + raise NotImplementedError + + @abstractmethod + async def run_server_loop(self): + raise NotImplementedError + + @abstractmethod + def cleanup(self): + raise NotImplementedError diff --git a/llumnix/output_queue/ray_queue_client.py b/llumnix/output_queue/ray_queue_client.py new file mode 100644 index 0000000..974b553 --- /dev/null +++ b/llumnix/output_queue/ray_queue_client.py @@ -0,0 +1,23 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from collections.abc import Iterable + +from llumnix.server_info import ServerInfo +from llumnix.output_queue.output_queue_client_base import OutputQueueClientBase + +class RayQueueClient(OutputQueueClientBase): + async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo): + output_queue = server_info.request_output_queue + return await output_queue.actor.put_nowait_batch.remote(items) diff --git a/llumnix/output_queue/ray_queue_server.py b/llumnix/output_queue/ray_queue_server.py new file mode 100644 index 0000000..461d3db --- /dev/null +++ b/llumnix/output_queue/ray_queue_server.py @@ -0,0 +1,44 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ray +from ray.util.queue import Queue as RayQueue +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + +from llumnix.output_queue.output_queue_server_base import OutputQueueServerBase + +class RayQueueServer(OutputQueueServerBase): + def __init__(self) -> None: + self.queue = RayQueue( + actor_options={ + "scheduling_strategy": + NodeAffinitySchedulingStrategy( + node_id=ray.get_runtime_context().get_node_id(), + soft=False + ) + } + ) + + async def get(self): + return await self.queue.actor.get.remote() + + async def get_nowait_batch(self): + qsize = await self.queue.actor.qsize.remote() + request_outputs = await self.queue.actor.get_nowait_batch.remote(qsize) + return request_outputs + + async def run_server_loop(self): + pass + + def cleanup(self): + pass diff --git a/llumnix/output_queue/utils.py b/llumnix/output_queue/utils.py new file mode 100644 index 0000000..aa0ee3e --- /dev/null +++ b/llumnix/output_queue/utils.py @@ -0,0 +1,26 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from llumnix.output_queue.output_queue_server_base import OutputQueueServerBase +from llumnix.output_queue.zmq_server import ZmqServer +from llumnix.output_queue.ray_queue_server import RayQueueServer + +from llumnix.output_queue.zmq_utils import get_open_zmq_ipc_path + +def get_output_queue(zmq_ip: str, zmq_port: int, queue_type: str) -> OutputQueueServerBase: + if queue_type == "zmq": + rpc_path = get_open_zmq_ipc_path(zmq_ip, zmq_port) + request_output_queue = ZmqServer(rpc_path) + else: + request_output_queue = RayQueueServer() + return request_output_queue diff --git a/llumnix/rpc/queue_client.py b/llumnix/output_queue/zmq_client.py similarity index 94% rename from llumnix/rpc/queue_client.py rename to llumnix/output_queue/zmq_client.py index 0d7be21..f49cd2f 100644 --- a/llumnix/rpc/queue_client.py +++ b/llumnix/output_queue/zmq_client.py @@ -12,6 +12,7 @@ # limitations under the License. from contextlib import contextmanager +from collections.abc import Iterable import zmq import zmq.asyncio @@ -20,14 +21,14 @@ from llumnix.logger import init_logger from llumnix.server_info import ServerInfo -from llumnix.rpc.utils import (RPC_GET_DATA_TIMEOUT_MS, RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM, RPC_SUCCESS_STR, +from llumnix.output_queue.zmq_utils import (RPC_GET_DATA_TIMEOUT_MS, RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM, RPC_SUCCESS_STR, RPCClientClosedError, RPC_REQUEST_TYPE, RPCUtilityRequest, RPCPutNoWaitBatchQueueRequest, get_open_zmq_ipc_path) logger = init_logger(__name__) -class QueueClient: +class ZmqClient: def __init__(self): self.context = zmq.asyncio.Context() self._data_timeout = RPC_GET_DATA_TIMEOUT_MS @@ -103,9 +104,7 @@ async def wait_for_server_rpc(self, rpc_path=rpc_path, error_message="Unable to start RPC Server") - async def put_nowait_batch(self, - items, - server_info: ServerInfo): + async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo): rpc_path = get_open_zmq_ipc_path(server_info.request_output_queue_ip, server_info.request_output_queue_port) await self._send_one_way_rpc_request( request=RPCPutNoWaitBatchQueueRequest(items=items), diff --git a/llumnix/rpc/queue_server.py b/llumnix/output_queue/zmq_server.py similarity index 97% rename from llumnix/rpc/queue_server.py rename to llumnix/output_queue/zmq_server.py index 043ae40..4dbcf38 100644 --- a/llumnix/rpc/queue_server.py +++ b/llumnix/output_queue/zmq_server.py @@ -19,7 +19,7 @@ import zmq.asyncio import cloudpickle -from llumnix.rpc.utils import (RPC_ZMQ_HWM, RPC_SUCCESS_STR, RPC_SOCKET_LIMIT_CUTOFF, +from llumnix.output_queue.zmq_utils import (RPC_ZMQ_HWM, RPC_SUCCESS_STR, RPC_SOCKET_LIMIT_CUTOFF, RPCPutNoWaitBatchQueueRequest, RPCUtilityRequest) from llumnix.logger import init_logger @@ -32,7 +32,7 @@ class Full(Exception): pass -class QueueServer: +class ZmqServer: def __init__(self, rpc_path: str, maxsize=0): self.context = zmq.asyncio.Context() diff --git a/llumnix/rpc/utils.py b/llumnix/output_queue/zmq_utils.py similarity index 99% rename from llumnix/rpc/utils.py rename to llumnix/output_queue/zmq_utils.py index 9d1aaaa..bdd76bc 100644 --- a/llumnix/rpc/utils.py +++ b/llumnix/output_queue/zmq_utils.py @@ -15,7 +15,6 @@ from enum import Enum from typing import Union, List, Any - RPC_GET_DATA_TIMEOUT_MS: int = 5000 RPC_SOCKET_LIMIT_CUTOFF = 2000 RPC_ZMQ_HWM = 0 diff --git a/llumnix/server_info.py b/llumnix/server_info.py index 02eaecc..0c0e506 100644 --- a/llumnix/server_info.py +++ b/llumnix/server_info.py @@ -11,11 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ray.util.queue import Queue as RayQueue + class ServerInfo: def __init__(self, server_id: str, + output_queue_type: str, + request_output_queue: RayQueue, request_output_queue_ip: str, request_output_queue_port: int) -> None: self.server_id = server_id + self.output_queue_type = output_queue_type + self.request_output_queue = request_output_queue.queue if hasattr(request_output_queue, "queue") else None self.request_output_queue_ip = request_output_queue_ip self.request_output_queue_port = request_output_queue_port diff --git a/tests/unit_test/backends/vllm/test_llm_engine.py b/tests/unit_test/backends/vllm/test_llm_engine.py index dec4fee..6f2c8ad 100644 --- a/tests/unit_test/backends/vllm/test_llm_engine.py +++ b/tests/unit_test/backends/vllm/test_llm_engine.py @@ -85,16 +85,19 @@ def test_llm_engine_process_model_outputs(): def test_llm_engine_from_engine_args(): engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True) - llm_engine = MockEngine.from_engine_args(engine_args, instance_id="0", migration_config=None) + llm_engine = MockEngine.from_engine_args(engine_args, output_queue_type='rayqueue', + instance_id="0", migration_config=None) assert llm_engine.executor_class == LlumnixRayGPUExecutor latency_data = LatencyMemData({},{},{}) - llm_engine = MockEngine.from_engine_args(engine_args, instance_id="0", migration_config=None, latency_mem=latency_data) + llm_engine = MockEngine.from_engine_args(engine_args, output_queue_type='rayqueue', + instance_id="0", migration_config=None, latency_mem=latency_data) assert llm_engine.executor_class == SimGPUExecutor def test_llm_engine_add_requset(): engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True) - llm_engine = LLMEngineLlumnix.from_engine_args(engine_args, instance_id="0", migration_config=None, latency_mem=MagicMock(sepc=LatencyMemData)) + llm_engine = LLMEngineLlumnix.from_engine_args(engine_args, output_queue_type='rayqueue', instance_id="0", + migration_config=None, latency_mem=MagicMock(sepc=LatencyMemData)) sampling_params = SamplingParams(top_k=1, temperature=0, ignore_eos=True, max_tokens=100) llm_engine.scheduler.scheduler_lock = MagicMock() llm_engine.add_request("0", None, "prompt", sampling_params) diff --git a/tests/unit_test/output_queue/__init__.py b/tests/unit_test/output_queue/__init__.py new file mode 100644 index 0000000..4638bd9 --- /dev/null +++ b/tests/unit_test/output_queue/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit_test/rpc/test_queue.py b/tests/unit_test/output_queue/test_zmq.py similarity index 91% rename from tests/unit_test/rpc/test_queue.py rename to tests/unit_test/output_queue/test_zmq.py index 0c8c18d..a3ca268 100644 --- a/tests/unit_test/rpc/test_queue.py +++ b/tests/unit_test/output_queue/test_zmq.py @@ -18,9 +18,9 @@ from vllm.outputs import CompletionOutput, RequestOutput -from llumnix.rpc.queue_server import QueueServer -from llumnix.rpc.queue_client import QueueClient -from llumnix.rpc.utils import get_open_zmq_ipc_path +from llumnix.output_queue.zmq_server import ZmqServer +from llumnix.output_queue.zmq_client import ZmqClient +from llumnix.output_queue.utils import get_open_zmq_ipc_path from llumnix.utils import random_uuid from llumnix.server_info import ServerInfo from llumnix.entrypoints.llumnix_utils import init_request_output_queue @@ -33,7 +33,7 @@ def init_server_info(): server_id = random_uuid() ip = '127.0.0.1' port = 1234 - server_info = ServerInfo(server_id, ip, port) + server_info = ServerInfo(server_id, "zmq", None, ip, port) return server_info @pytest.fixture @@ -46,7 +46,7 @@ def request_output_queue_server(): @ray.remote(num_cpus=1) class Server: def __init__(self, rpc_path): - self.server = QueueServer(rpc_path) + self.server = ZmqServer(rpc_path) asyncio.create_task(self.server.run_server_loop()) request_output_queue = self.server self.stop_signal = asyncio.Event() @@ -94,11 +94,11 @@ def timeout_handler(signum, frame): async def benchmark_queue(qps, ip=None, port=None): rpc_path = get_open_zmq_ipc_path(ip, port) - rpc_client = QueueClient() + rpc_client = ZmqClient() request_output_queue = rpc_client server = Server.remote(rpc_path) server_id = random_uuid() - server_info = ServerInfo(server_id, ip, port) + server_info = ServerInfo(server_id, 'zmq', None, ip, port) await rpc_client.wait_for_server_rpc(server_info) num_request_outputs = 500