Skip to content

Commit

Permalink
[Serve] Faster bulk imperative Serve Application deploys (ray-project…
Browse files Browse the repository at this point in the history
…#49168)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Our pattern of using Ray Serve has us deploying many hundreds/thousands
of apps using the imperative API (`serve.run`). This ends up being very
slow because the Controller needs to checkpoint as part of every RPC. It
would be significantly more efficient to batch the deploys so that we
can checkpoint fewer times.

This PR adds a new `serve.run_many()` public API, marked as
developer-only, that can submit many applications to the Serve
Controller in one RPC, with just a single checkpoint being saved after
all of those applications are registered. The entire existing code path
(including `serve.run()`) is refactored to be bulk operations under the
hood (`serve.run()` calls `serve.run_many()`).

To further help with our particular use case, where the applications are
being deployed from a controller that doesn't care about waiting for
e.g. ingress deployment creation, the new code path also has
fine-grained control over which things are waited for.

---

Just introducing a batch API isn't sufficient to actually provide a
meaningful speedup. As mentioned above, the thing that is slow is the
checkpointing, and right now, the checkpointing is very granular: the
various stateful components checkpoint themselves at the bottom of the
call stack, so even a single RPC might cause them to checkpoint multiple
times right now.

Below I've tried to map out all the reasons that the
`Application/DeploymentStateManager`s might checkpoint:

```mermaid
graph TD;
    deployment_state_set_target_state[DeploymentState._set_target_state] --> dsm_checkpoint[DeploymentStateManager._save_checkpoint_func]
    deployment_state_deploy[DeploymentState.deploy] --> deployment_state_set_target_state
    deployment_state_manager_deploy[DeploymentStateManager.deploy] --> deployment_state_deploy
    application_state_apply_deployment_info[ApplicationState.apply_deployment_info] --> deployment_state_manager_deploy
    application_state_reconcile_target_deployments[ApplicationState._reconcile_target_deployments] --x application_state_apply_deployment_info
    application_state_update[ApplicationState.update] --> application_state_reconcile_target_deployments
    application_state_manager_update[ApplicationStateManager.update] --x application_state_update
    serve_controller_run_control_loop[ServeController.run_control_loop] --> application_state_manager_update
    


    deployment_state_set_target_state_deleting[DeploymentState._set_target_state_deleting] --> dsm_checkpoint
    deployment_state_delete[DeploymentState.delete] --> deployment_state_set_target_state_deleting
    deployment_state_manager_delete_deployment[DeploymentStateManager.delete_deployment] --> deployment_state_delete
    application_state_delete_deployment[ApplicationState._delete_deployment] --> deployment_state_manager_delete_deployment
    application_state_reconcile_target_deployments --> application_state_delete_deployment

    deployment_state_autoscale[DeploymentState.autoscale] --> deployment_state_set_target_state
    deployment_state_manager_update[DeploymentStateManager.update] --> deployment_state_autoscale
    serve_controller_run_control_loop --> deployment_state_manager_update

    as_set_target_state[ApplicationState._set_target_state] --> asm_checkpoint[ApplicationStateManager._save_checkpoint_func]
 
 
 
 
as_recover_target_state_from_checkpoint[ApplicationState.recover_target_state_from_checkpoint] --> as_set_target_state
    asm_recover_from_checkpoint[ApplicationStateManager._recover_from_checkpoint] --> as_recover_target_state_from_checkpoint
    asm_init[ApplicationStateManager.__init__] --> asm_recover_from_checkpoint
    sc_init[ServeController.__init__] --> asm_init

    as_set_target_state_deleting[ApplicationState._set_target_state_deleting] --> as_set_target_state
    as_delete[ApplicationState.delete] --> as_set_target_state_deleting
    asm_delete_app[ApplicationStateManager.delete_app] --> as_delete
    sc_delete_apps[ServeController.delete_apps] --x asm_delete_app
    RPC --> sc_delete_apps

    as_clear_target_state_and_store_config[ApplicationState._clear_target_state_and_store_config] --> as_set_target_state
    as_apply_app_config[ApplicationState.apply_app_config] --> as_clear_target_state_and_store_config
    asm_apply_app_configs[ApplicationStateManager.apply_app_configs] --x as_apply_app_config
    sc_apply_config[ServeController.apply_config] --> asm_apply_app_configs
    RPC --> sc_apply_config


    as_deploy_app[ApplicationState.deploy_app] --> as_set_target_state
    asm_deploy_app[ApplicationStateManager.deploy_app] --> as_deploy_app
    sc_deploy_application[ServeController.deploy_application] --> asm_deploy_app
    RPC --> sc_deploy_application

    as_apply_app_config --> as_set_target_state
```

So, in addition to the batch API that the client sees, I've refactored
where these checkpoints are done so that they happen at the *top* of
those call stacks instead of at the bottom.

- We still checkpoint before (now just before) returning an RPC that
mutates state.
- We still checkpoint after making any changes to internal state and
before issuing any commands to the cluster to e.g. start/stop replicas
(just not *immediately* after making the internal state change).

I did *not* change the `EndpointState`'s checkpointing because it hasn't
shown up in our flamegraphs.

---

Before these changes, deploying 5k Serve apps, each with one deployment,
took >1 hour and would often never finish because the Serve Controller
would become unresponsive and KubeRay would end up restarting the
cluster.

With these changes, deploying 5k Serve apps with a batch size of 100 per
API call only takes about 90 seconds!

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Co-authored-by: Cindy Zhang <cindyzyx9@gmail.com>
  • Loading branch information
2 people authored and xsuler committed Mar 4, 2025
1 parent c621791 commit 926725c
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 231 deletions.
6 changes: 6 additions & 0 deletions python/ray/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from ray.serve.api import (
Application,
Deployment,
RunTarget,
_run,
_run_many,
delete,
deployment,
get_app_handle,
Expand All @@ -15,6 +17,7 @@
ingress,
multiplexed,
run,
run_many,
shutdown,
start,
status,
Expand All @@ -38,6 +41,7 @@

__all__ = [
"_run",
"_run_many",
"batch",
"start",
"HTTPOptions",
Expand All @@ -46,6 +50,8 @@
"ingress",
"deployment",
"run",
"run_many",
"RunTarget",
"delete",
"Application",
"Deployment",
Expand Down
127 changes: 63 additions & 64 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from copy import deepcopy
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

import ray
from ray import cloudpickle
Expand Down Expand Up @@ -222,7 +222,6 @@ def __init__(
name: str,
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
save_checkpoint_func: Callable,
logging_config: LoggingConfig,
):
"""
Expand All @@ -231,11 +230,6 @@ def __init__(
deployment_state_manager: State manager for all deployments
in the cluster.
endpoint_state: State manager for endpoints in the system.
save_checkpoint_func: Function that can be called to write
a checkpoint of the application state. This should be
called in self._set_target_state() before actually
setting the target state so that the controller can
properly recover application states if it crashes.
"""

self._name = name
Expand All @@ -261,7 +255,6 @@ def __init__(
deleting=False,
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func
self._logging_config = logging_config

@property
Expand Down Expand Up @@ -364,11 +357,6 @@ def _set_target_state(
api_type=api_type,
)

# Checkpoint ahead, so that if the controller crashes before we
# write to the target state, the target state will be recovered
# after the controller recovers
self._save_checkpoint_func(writeahead_checkpoints={self._name: target_state})
# Set target state
self._target_state = target_state

def _set_target_state_deleting(self):
Expand All @@ -377,7 +365,7 @@ def _set_target_state_deleting(self):
Wipes the target deployment infos, code version, and config.
"""
self._set_target_state(
deployment_infos=dict(),
deployment_infos={},
api_type=self._target_state.api_type,
code_version=None,
target_config=None,
Expand Down Expand Up @@ -889,7 +877,10 @@ def __init__(
self._endpoint_state = endpoint_state
self._kv_store = kv_store
self._logging_config = logging_config
self._application_states: Dict[str, ApplicationState] = dict()

self._shutting_down = False

self._application_states: Dict[str, ApplicationState] = {}
self._recover_from_checkpoint()

def _recover_from_checkpoint(self):
Expand All @@ -902,7 +893,6 @@ def _recover_from_checkpoint(self):
app_name,
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
app_state.recover_target_state_from_checkpoint(checkpoint_data)
Expand All @@ -914,6 +904,53 @@ def delete_app(self, name: str) -> None:
return
self._application_states[name].delete()

def deploy_apps(self, name_to_deployment_args: Dict[str, List[Dict]]) -> None:
live_route_prefixes: Dict[str, str] = {
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
}

for name, deployment_args in name_to_deployment_args.items():
for deploy_param in deployment_args:
# Make sure route_prefix is not being used by other application.
deploy_app_prefix = deploy_param.get("route_prefix")
if deploy_app_prefix is None:
continue

existing_app_name = live_route_prefixes.get(deploy_app_prefix)
# It's ok to redeploy an app with the same prefix
# if it has the same name as the app already using that prefix.
if existing_app_name is not None and existing_app_name != name:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{existing_app_name}". Failed to deploy application "{name}".'
)

# We might be deploying more than one app,
# so we need to add this app's prefix to the
# set of live route prefixes that we're checking
# against during this batch operation.
live_route_prefixes[deploy_app_prefix] = name

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
self._endpoint_state,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

deployment_infos = {
params["deployment_name"]: deploy_args_to_deployment_info(
**params, app_name=name
)
for params in deployment_args
}
self._application_states[name].deploy_app(deployment_infos)

def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
"""Deploy the specified app to the list of deployment arguments.
Expand All @@ -928,45 +965,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
RayServeException: If the list of deployments is trying to
use a route prefix that is already used by another application
"""

# Make sure route_prefix is not being used by other application.
live_route_prefixes: Dict[str, str] = {
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
and name != app_name
}

for deploy_param in deployment_args:
deploy_app_prefix = deploy_param.get("route_prefix", None)
if deploy_app_prefix is None:
continue

app_name = live_route_prefixes.get(deploy_app_prefix)
if app_name is not None:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{app_name}". Failed to deploy application "{name}".'
)

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

deployment_infos = {
params["deployment_name"]: deploy_args_to_deployment_info(
**params, app_name=name
)
for params in deployment_args
}
self._application_states[name].deploy_app(deployment_infos)
self.deploy_apps({name: deployment_args})

def apply_app_configs(
self,
Expand All @@ -990,7 +989,6 @@ def apply_app_configs(
app_config.name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
logging_config=self._logging_config,
)

Expand Down Expand Up @@ -1084,6 +1082,8 @@ def update(self):
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

def shutdown(self) -> None:
self._shutting_down = True

for app_state in self._application_states.values():
app_state.delete()

Expand All @@ -1095,23 +1095,22 @@ def is_ready_for_shutdown(self) -> bool:
Iterate through all application states and check if all their applications
are deleted.
"""
return all(
return self._shutting_down and all(
app_state.is_deleted() for app_state in self._application_states.values()
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, ApplicationTargetState]]
) -> None:
def save_checkpoint(self) -> None:
"""Write a checkpoint of all application states."""
if self._shutting_down:
# Once we're told to shut down, stop writing checkpoints.
# Calling .shutdown() deletes any existing checkpoint.
return

application_state_info = {
app_name: app_state.get_checkpoint_data()
for app_name, app_state in self._application_states.items()
}

if writeahead_checkpoints is not None:
application_state_info.update(writeahead_checkpoints)

self._kv_store.put(
CHECKPOINT_KEY,
cloudpickle.dumps(application_state_info),
Expand Down Expand Up @@ -1254,7 +1253,7 @@ def override_deployment_info(
ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")

# What to pass to info.update
override_options = dict()
override_options = {}

# Merge app-level and deployment-level runtime_envs.
replica_config = info.replica_config
Expand Down
9 changes: 8 additions & 1 deletion python/ray/serve/_private/build_app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union

from ray.dag.py_obj_scanner import _PyObjScanner
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve.deployment import Application, Deployment
from ray.serve.handle import DeploymentHandle
from ray.serve.schema import LoggingConfig

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand Down Expand Up @@ -46,6 +47,8 @@ def __contains__(self, key: object):
class BuiltApplication:
# Name of the application.
name: str
route_prefix: Optional[str]
logging_config: Optional[LoggingConfig]
# Name of the application's 'ingress' deployment
# (the one exposed over gRPC/HTTP/handle).
ingress_deployment_name: str
Expand All @@ -69,6 +72,8 @@ def build_app(
app: Application,
*,
name: str,
route_prefix: Optional[str] = None,
logging_config: Optional[Union[Dict, LoggingConfig]] = None,
default_runtime_env: Optional[Dict[str, Any]] = None,
make_deployment_handle: Optional[
Callable[[Deployment, str], DeploymentHandle]
Expand Down Expand Up @@ -99,6 +104,8 @@ def build_app(
)
return BuiltApplication(
name=name,
route_prefix=route_prefix,
logging_config=logging_config,
ingress_deployment_name=app._bound_deployment.name,
deployments=deployments,
deployment_handles={
Expand Down
Loading

0 comments on commit 926725c

Please sign in to comment.