From a54a7d0c6c7c97e8f6ce4fbc83f124b38d930a32 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 31 Dec 2024 16:44:15 +0800 Subject: [PATCH 1/6] support accelerate rllib training on custom resources Signed-off-by: liuxsh9 --- rllib/algorithms/algorithm_config.py | 8 ++++++++ rllib/core/learner/learner_group.py | 2 ++ rllib/core/learner/torch/torch_learner.py | 6 +++++- rllib/env/multi_agent_env_runner.py | 1 + rllib/env/single_agent_env_runner.py | 1 + rllib/utils/framework.py | 8 +++++++- 6 files changed, 24 insertions(+), 2 deletions(-) diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 8ef9d6de5cf28..1f034299fb5df 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -357,6 +357,7 @@ def __init__(self, algo_class: Optional[type] = None): self.num_learners = 0 self.num_gpus_per_learner = 0 self.num_cpus_per_learner = "auto" + self.custom_resources_per_learner = {} self.num_aggregator_actors_per_learner = 0 self.max_requests_in_flight_per_aggregator_actor = 3 self.local_gpu_idx = 0 @@ -2138,6 +2139,7 @@ def learners( num_learners: Optional[int] = NotProvided, num_cpus_per_learner: Optional[Union[str, float, int]] = NotProvided, num_gpus_per_learner: Optional[Union[float, int]] = NotProvided, + custom_resources_per_learner: Optional[Dict[str, Union[float, int]]] = NotProvided, num_aggregator_actors_per_learner: Optional[int] = NotProvided, max_requests_in_flight_per_aggregator_actor: Optional[float] = NotProvided, local_gpu_idx: Optional[int] = NotProvided, @@ -2164,6 +2166,10 @@ def learners( `num_learners=0`, any value greater than 0 runs the training on a single GPU on the main process, while a value of 0 runs the training on main process CPUs. + custom_resources_per_learner: A dict that specify custom resources allocated + per Learner worker. Similar to the GPU, if you declare a certain NPU/HPU + (which is already supported in ray train) resource greater than 0, such + as {"NPU": 1}, the training will run on the the corresponding accelerator. num_aggregator_actors_per_learner: The number of aggregator actors per Learner (if num_learners=0, one local learner is created). Must be at least 1. Aggregator actors perform the task of a) converting episodes @@ -2196,6 +2202,8 @@ def learners( self.num_cpus_per_learner = num_cpus_per_learner if num_gpus_per_learner is not NotProvided: self.num_gpus_per_learner = num_gpus_per_learner + if custom_resources_per_learner is not NotProvided: + self.custom_resources_per_learner = custom_resources_per_learner if num_aggregator_actors_per_learner is not NotProvided: self.num_aggregator_actors_per_learner = num_aggregator_actors_per_learner if max_requests_in_flight_per_aggregator_actor is not NotProvided: diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index 1c5613a687bdb..6352645796527 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -145,9 +145,11 @@ def __init__( # TODO (sven): Activate this when Ray has figured out GPU pre-loading. # - (0.01 * self.config.num_aggregator_actors_per_learner), ) + custom_resources_per_learner = self.config.custom_resources_per_learner resources_per_learner = { "CPU": num_cpus_per_learner, "GPU": num_gpus_per_learner, + **custom_resources_per_learner } backend_executor = BackendExecutor( diff --git a/rllib/core/learner/torch/torch_learner.py b/rllib/core/learner/torch/torch_learner.py index 5e43315d133f4..c567bb5baa72e 100644 --- a/rllib/core/learner/torch/torch_learner.py +++ b/rllib/core/learner/torch/torch_learner.py @@ -462,7 +462,11 @@ def build(self) -> None: after setting up all variables because `configure_optimizer_for_module` is called in this `Learner.build()`. """ - self._device = get_device(self.config, self.config.num_gpus_per_learner) + self._device = get_device( + self.config, + self.config.num_gpus_per_learner, + self.config.custom_resources_per_learner + ) super().build() diff --git a/rllib/env/multi_agent_env_runner.py b/rllib/env/multi_agent_env_runner.py index 4c84a70e07b67..a8fe6e996cc00 100644 --- a/rllib/env/multi_agent_env_runner.py +++ b/rllib/env/multi_agent_env_runner.py @@ -99,6 +99,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs): self._device = get_device( self.config, 0 if not self.worker_index else self.config.num_gpus_per_env_runner, + self.config.custom_resources_per_env_runner ) # Create the vectorized gymnasium env. diff --git a/rllib/env/single_agent_env_runner.py b/rllib/env/single_agent_env_runner.py index 07f51ffa16578..8c7658b2bebd7 100644 --- a/rllib/env/single_agent_env_runner.py +++ b/rllib/env/single_agent_env_runner.py @@ -92,6 +92,7 @@ def __init__(self, *, config: AlgorithmConfig, **kwargs): self._device = get_device( self.config, 0 if not self.worker_index else self.config.num_gpus_per_env_runner, + self.config.custom_resources_per_env_runner ) # Create the vectorized gymnasium env. diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index c0b9a28fa4726..3c0c5ee4156d7 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -51,7 +51,7 @@ def convert_to_tensor( @PublicAPI -def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1): +def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1, custom_resources_requested={}): """Returns a single device (CPU or some GPU) depending on a config. Args: @@ -59,6 +59,8 @@ def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1): num_gpus_requested: The number of GPUs actually requested. This may be the value of `config.num_gpus_per_env_runner` when for example calling this function from an EnvRunner. + custom_resources_requested: Similar to the GPU, the dictionary contains the number + of accelerators actually requested. Returns: A single device (or name) given `config` and `num_gpus_requested`. @@ -94,6 +96,10 @@ def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1): # `torch.cuda.device_count() = 1` and torch.device(0) maps to that GPU # with ID=1 on the node. return torch.device(config.local_gpu_idx) + elif custom_resources_per_learner: + # The `get_devices()` api in ray.air should handle the custom accelerator + # and return torch.device("cpu") if not accelerator is available. + return get_devices()[0] else: return torch.device("cpu") else: From 468dbd5347fbea68fda2b3de3403d5bb65cb44a4 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 31 Dec 2024 17:29:41 +0800 Subject: [PATCH 2/6] format Signed-off-by: liuxsh9 --- rllib/algorithms/algorithm_config.py | 11 +++++++---- rllib/core/learner/learner_group.py | 2 +- rllib/core/learner/torch/torch_learner.py | 6 +++--- rllib/env/multi_agent_env_runner.py | 2 +- rllib/env/single_agent_env_runner.py | 2 +- rllib/utils/framework.py | 14 +++++++++----- 6 files changed, 22 insertions(+), 15 deletions(-) diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 1f034299fb5df..afd46debfbbd2 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -2139,7 +2139,9 @@ def learners( num_learners: Optional[int] = NotProvided, num_cpus_per_learner: Optional[Union[str, float, int]] = NotProvided, num_gpus_per_learner: Optional[Union[float, int]] = NotProvided, - custom_resources_per_learner: Optional[Dict[str, Union[float, int]]] = NotProvided, + custom_resources_per_learner: Optional[ + Dict[str, Union[float, int]] + ] = NotProvided, num_aggregator_actors_per_learner: Optional[int] = NotProvided, max_requests_in_flight_per_aggregator_actor: Optional[float] = NotProvided, local_gpu_idx: Optional[int] = NotProvided, @@ -2167,9 +2169,10 @@ def learners( training on a single GPU on the main process, while a value of 0 runs the training on main process CPUs. custom_resources_per_learner: A dict that specify custom resources allocated - per Learner worker. Similar to the GPU, if you declare a certain NPU/HPU - (which is already supported in ray train) resource greater than 0, such - as {"NPU": 1}, the training will run on the the corresponding accelerator. + per Learner worker. Similar to the GPU, if you declare a certain number + for NPU/HPU (which is already supported in ray train) greater than 0, + such as {"NPU": 1}, the training will run on the the corresponding + accelerator. num_aggregator_actors_per_learner: The number of aggregator actors per Learner (if num_learners=0, one local learner is created). Must be at least 1. Aggregator actors perform the task of a) converting episodes diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index 6352645796527..6400a3aad8794 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -149,7 +149,7 @@ def __init__( resources_per_learner = { "CPU": num_cpus_per_learner, "GPU": num_gpus_per_learner, - **custom_resources_per_learner + **custom_resources_per_learner, } backend_executor = BackendExecutor( diff --git a/rllib/core/learner/torch/torch_learner.py b/rllib/core/learner/torch/torch_learner.py index c567bb5baa72e..7f38ef08a91d7 100644 --- a/rllib/core/learner/torch/torch_learner.py +++ b/rllib/core/learner/torch/torch_learner.py @@ -463,9 +463,9 @@ def build(self) -> None: called in this `Learner.build()`. """ self._device = get_device( - self.config, - self.config.num_gpus_per_learner, - self.config.custom_resources_per_learner + self.config, + self.config.num_gpus_per_learner, + self.config.custom_resources_per_learner, ) super().build() diff --git a/rllib/env/multi_agent_env_runner.py b/rllib/env/multi_agent_env_runner.py index a8fe6e996cc00..da1b753dc2996 100644 --- a/rllib/env/multi_agent_env_runner.py +++ b/rllib/env/multi_agent_env_runner.py @@ -99,7 +99,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs): self._device = get_device( self.config, 0 if not self.worker_index else self.config.num_gpus_per_env_runner, - self.config.custom_resources_per_env_runner + self.config.custom_resources_per_env_runner, ) # Create the vectorized gymnasium env. diff --git a/rllib/env/single_agent_env_runner.py b/rllib/env/single_agent_env_runner.py index 8c7658b2bebd7..df62303da7f6a 100644 --- a/rllib/env/single_agent_env_runner.py +++ b/rllib/env/single_agent_env_runner.py @@ -92,7 +92,7 @@ def __init__(self, *, config: AlgorithmConfig, **kwargs): self._device = get_device( self.config, 0 if not self.worker_index else self.config.num_gpus_per_env_runner, - self.config.custom_resources_per_env_runner + self.config.custom_resources_per_env_runner, ) # Create the vectorized gymnasium env. diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 3c0c5ee4156d7..f739cf52a82b8 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -51,7 +51,11 @@ def convert_to_tensor( @PublicAPI -def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1, custom_resources_requested={}): +def get_device( + config: "AlgorithmConfig", + num_gpus_requested: int = 1, + custom_resources_requested={}, +): """Returns a single device (CPU or some GPU) depending on a config. Args: @@ -59,8 +63,8 @@ def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1, custom_re num_gpus_requested: The number of GPUs actually requested. This may be the value of `config.num_gpus_per_env_runner` when for example calling this function from an EnvRunner. - custom_resources_requested: Similar to the GPU, the dictionary contains the number - of accelerators actually requested. + custom_resources_requested: Similar to the GPU, the dictionary contains the + number of accelerators actually requested. Returns: A single device (or name) given `config` and `num_gpus_requested`. @@ -96,8 +100,8 @@ def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1, custom_re # `torch.cuda.device_count() = 1` and torch.device(0) maps to that GPU # with ID=1 on the node. return torch.device(config.local_gpu_idx) - elif custom_resources_per_learner: - # The `get_devices()` api in ray.air should handle the custom accelerator + elif custom_resources_requested: + # The `get_devices()` api in ray.air should handle the custom accelerator # and return torch.device("cpu") if not accelerator is available. return get_devices()[0] else: From 9eec6a4e8809b964949b880cdc6fae3857a58b6a Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 31 Dec 2024 18:10:54 +0800 Subject: [PATCH 3/6] fix import Signed-off-by: liuxsh9 --- rllib/utils/framework.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index f739cf52a82b8..256be808aac6c 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -101,6 +101,7 @@ def get_device( # with ID=1 on the node. return torch.device(config.local_gpu_idx) elif custom_resources_requested: + from ray.air._internal.torch_utils import get_devices # The `get_devices()` api in ray.air should handle the custom accelerator # and return torch.device("cpu") if not accelerator is available. return get_devices()[0] From bc9a91512bf7cb95cad9e43f858cd2576da5d539 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 31 Dec 2024 18:26:00 +0800 Subject: [PATCH 4/6] format Signed-off-by: liuxsh9 --- rllib/utils/framework.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 256be808aac6c..9432cccf58409 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -102,6 +102,7 @@ def get_device( return torch.device(config.local_gpu_idx) elif custom_resources_requested: from ray.air._internal.torch_utils import get_devices + # The `get_devices()` api in ray.air should handle the custom accelerator # and return torch.device("cpu") if not accelerator is available. return get_devices()[0] From b940e26c59fb29eb952acf7f0d06ad9243118282 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Thu, 2 Jan 2025 19:53:12 +0800 Subject: [PATCH 5/6] fix lint Signed-off-by: liuxsh9 --- rllib/utils/framework.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 9432cccf58409..b9620d3082eae 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -54,7 +54,7 @@ def convert_to_tensor( def get_device( config: "AlgorithmConfig", num_gpus_requested: int = 1, - custom_resources_requested={}, + custom_resources_requested=None, ): """Returns a single device (CPU or some GPU) depending on a config. From 374ea7011dfa246608073ac585dd2c79f2b4e8e3 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Thu, 2 Jan 2025 20:00:55 +0800 Subject: [PATCH 6/6] add annotation Signed-off-by: liuxsh9 --- rllib/utils/framework.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index b9620d3082eae..6e869454d372c 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -54,7 +54,7 @@ def convert_to_tensor( def get_device( config: "AlgorithmConfig", num_gpus_requested: int = 1, - custom_resources_requested=None, + custom_resources_requested: Optional[dict] = None, ): """Returns a single device (CPU or some GPU) depending on a config.