From 830d707c41d69d29ec8e8cb4b45a5b124090e4cd Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Mon, 23 Jan 2023 12:00:28 -0800 Subject: [PATCH] Revert "Revert "[AIR] Deprecations for 2.3 (#31763)" (#31866)" This reverts commit 58386d0d9aac53690ebcafcff78799b99d70d249. --- python/ray/air/checkpoint.py | 38 +-- python/ray/data/preprocessors/batch_mapper.py | 12 +- python/ray/train/__init__.py | 16 -- python/ray/train/tests/test_torch_trainer.py | 10 - python/ray/train/torch/train_loop_utils.py | 15 -- python/ray/train/train_loop_utils.py | 233 ------------------ 6 files changed, 5 insertions(+), 319 deletions(-) delete mode 100644 python/ray/train/train_loop_utils.py diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index c1eaf129697af..2f0e82cbf7d83 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -27,7 +27,7 @@ upload_to_uri, ) from ray.air.constants import PREPROCESSOR_KEY, CHECKPOINT_ID_ATTR -from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI +from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: from ray.data.preprocessor import Preprocessor @@ -415,42 +415,6 @@ def to_dict(self) -> dict: checkpoint_data[PREPROCESSOR_KEY] = self._override_preprocessor return checkpoint_data - @classmethod - @Deprecated( - message="To restore a checkpoint from a remote object ref, call " - "`ray.get(obj_ref)` instead." - ) - def from_object_ref(cls, obj_ref: ray.ObjectRef) -> "Checkpoint": - """Create checkpoint object from object reference. - - Args: - obj_ref: ObjectRef pointing to checkpoint data. - - Returns: - Checkpoint: checkpoint object. - """ - raise DeprecationWarning( - "`from_object_ref` is deprecated and will be removed in a future Ray " - "version. To restore a Checkpoint from a remote object ref, call " - "`ray.get(obj_ref)` instead.", - ) - - @Deprecated( - message="To store the checkpoint in the Ray object store, call `ray.put(ckpt)` " - "instead of `ckpt.to_object_ref()`." - ) - def to_object_ref(self) -> ray.ObjectRef: - """Return checkpoint data as object reference. - - Returns: - ray.ObjectRef: ObjectRef pointing to checkpoint data. - """ - raise DeprecationWarning( - "`to_object_ref` is deprecated and will be removed in a future Ray " - "version. To store the checkpoint in the Ray object store, call " - "`ray.put(ckpt)` instead of `ckpt.to_object_ref()`.", - ) - @classmethod def from_directory(cls, path: Union[str, os.PathLike]) -> "Checkpoint": """Create checkpoint object from directory. diff --git a/python/ray/data/preprocessors/batch_mapper.py b/python/ray/data/preprocessors/batch_mapper.py index 6584b7853f478..f89ac0af012ce 100644 --- a/python/ray/data/preprocessors/batch_mapper.py +++ b/python/ray/data/preprocessors/batch_mapper.py @@ -80,23 +80,19 @@ def __init__( Union[np.ndarray, Dict[str, np.ndarray]], ], ], - batch_format: Optional[BatchFormat] = None, + batch_format: Optional[BatchFormat], batch_size: Optional[Union[int, Literal["default"]]] = "default", - # TODO: Make batch_format required from user # TODO: Introduce a "zero_copy" format # TODO: We should reach consistency of args between BatchMapper and map_batches. ): - if not batch_format: - raise DeprecationWarning( - "batch_format is a required argument for BatchMapper from Ray 2.1." - "You must specify either 'pandas' or 'numpy' batch format." - ) if batch_format not in [ BatchFormat.PANDAS, BatchFormat.NUMPY, ]: - raise ValueError("BatchMapper only supports pandas and numpy batch format.") + raise ValueError( + "BatchMapper only supports 'pandas' or 'numpy' batch format." + ) self.batch_format = batch_format self.batch_size = batch_size diff --git a/python/ray/train/__init__.py b/python/ray/train/__init__.py index 2bcd0cdb38a4a..491f43e0d541a 100644 --- a/python/ray/train/__init__.py +++ b/python/ray/train/__init__.py @@ -1,15 +1,6 @@ from ray._private.usage import usage_lib from ray.train.backend import BackendConfig from ray.train.constants import TRAIN_DATASET_KEY -from ray.train.train_loop_utils import ( - get_dataset_shard, - load_checkpoint, - local_rank, - report, - save_checkpoint, - world_rank, - world_size, -) from ray.train.trainer import TrainingIterator @@ -17,13 +8,6 @@ __all__ = [ "BackendConfig", - "get_dataset_shard", - "load_checkpoint", - "local_rank", - "report", - "save_checkpoint", "TrainingIterator", - "world_rank", - "world_size", "TRAIN_DATASET_KEY", ] diff --git a/python/ray/train/tests/test_torch_trainer.py b/python/ray/train/tests/test_torch_trainer.py index 801078dceceb0..2c4c71c169a09 100644 --- a/python/ray/train/tests/test_torch_trainer.py +++ b/python/ray/train/tests/test_torch_trainer.py @@ -378,16 +378,6 @@ def __getstate__(self): assert results.checkpoint -def test_torch_prepare_model_deprecated(): - model = torch.nn.Linear(1, 1) - - with pytest.raises(DeprecationWarning): - train.torch.prepare_model(model, wrap_ddp=True) - - with pytest.raises(DeprecationWarning): - train.torch.prepare_model(model, ddp_kwargs={"x": "y"}) - - if __name__ == "__main__": import sys diff --git a/python/ray/train/torch/train_loop_utils.py b/python/ray/train/torch/train_loop_utils.py index 2861d0487f125..bc765648069dd 100644 --- a/python/ray/train/torch/train_loop_utils.py +++ b/python/ray/train/torch/train_loop_utils.py @@ -53,9 +53,6 @@ def prepare_model( move_to_device: bool = True, parallel_strategy: Optional[str] = "ddp", parallel_strategy_kwargs: Optional[Dict[str, Any]] = None, - # Deprecated args. - wrap_ddp: bool = False, - ddp_kwargs: Optional[Dict[str, Any]] = None, ) -> torch.nn.Module: """Prepares the model for distributed execution. @@ -76,18 +73,6 @@ def prepare_model( or "fsdp", respectively. """ - if wrap_ddp: - raise DeprecationWarning( - "The `wrap_ddp` argument is deprecated as of Ray 2.1. Use the " - "`parallel_strategy` argument instead." - ) - - if ddp_kwargs: - raise DeprecationWarning( - "The `ddp_kwargs` argument is deprecated as of Ray 2.1. Use the " - "`parallel_strategy_kwargs` arg instead." - ) - if parallel_strategy == "fsdp" and FullyShardedDataParallel is None: raise ImportError( "FullyShardedDataParallel requires torch>=1.11.0. " diff --git a/python/ray/train/train_loop_utils.py b/python/ray/train/train_loop_utils.py deleted file mode 100644 index bc9532c4d9841..0000000000000 --- a/python/ray/train/train_loop_utils.py +++ /dev/null @@ -1,233 +0,0 @@ -from typing import TYPE_CHECKING, Dict, Optional, Union - -from ray.util.annotations import Deprecated - -if TYPE_CHECKING: - from ray.data import Dataset, DatasetPipeline - - -def _get_deprecation_msg(is_docstring: bool, fn_name: Optional[str] = None): - if is_docstring: - session_api_link = ":ref:`ray.air.session `" - else: - session_api_link = ( - "`ray.air.session` ( " - "https://docs.ray.io/en/latest/ray-air/package-ref.html" - "#module-ray.air.session" - ") ." - ) - - deprecation_msg = ( - f"The `train.{fn_name}` APIs are deprecated in Ray " - f"2.1, and is replaced by {session_api_link}" - "The `ray.air.session` APIs provide the same functionality, " - "but in a unified manner across Ray Train and Ray Tune." - ) - return deprecation_msg - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def get_dataset_shard( - dataset_name: Optional[str] = None, -) -> Optional[Union["Dataset", "DatasetPipeline"]]: - """Returns the Ray Dataset or DatasetPipeline shard for this worker. - - Call :meth:`~ray.data.Dataset.iter_torch_batches` or - :meth:`~ray.data.Dataset.to_tf` on this shard to convert it to the appropriate - framework-specific data type. - - .. code-block:: python - - import ray - from ray import train - - def train_func(): - model = Net() - for iter in range(100): - data_shard = session.get_dataset_shard("train") - for batch in data_shard.iter_torch_batches(): - # ... - return model - - dataset = ray.data.read_csv("train.csv") - dataset.filter(...).repeat().random_shuffle() - - trainer = Trainer(backend="torch") - trainer.start() - - # Trainer will automatically handle sharding. - train_model = trainer.run(train_func, dataset=dataset) - trainer.shutdown() - - Args: - dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then - specifies which dataset shard to return. - - Returns: - The ``Dataset`` or ``DatasetPipeline`` shard to use for this worker. - If no dataset is passed into Trainer, then return None. - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=get_dataset_shard.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def report(**kwargs) -> None: - """Reports all keyword arguments to Train as intermediate results. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - train.report(hello="world") - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be reported by Train. - If callbacks are provided, they are executed on these - intermediate results. - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=report.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def world_rank() -> int: - """Get the world rank of this worker. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - if train.world_rank() == 0: - print("Worker 0") - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=world_rank.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def local_rank() -> int: - """Get the local rank of this worker (rank of the worker on its node). - - .. code-block:: python - - import time - from ray import train - - def train_func(): - if torch.cuda.is_available(): - torch.cuda.set_device(train.local_rank()) - ... - - trainer = Trainer(backend="torch", use_gpu=True) - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=local_rank.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def load_checkpoint() -> Optional[Dict]: - """Loads checkpoint data onto the worker. - - .. code-block:: python - - from ray import train - - def train_func(): - checkpoint = train.load_checkpoint() - for iter in range(checkpoint["epoch"], 5): - print(iter) - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func, checkpoint={"epoch": 3}) - # 3 - # 4 - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be checkpointed by Train. - Returns: - The most recently saved checkpoint if ``train.save_checkpoint()`` - has been called. Otherwise, the checkpoint that the session was - originally initialized with. ``None`` if neither exist. - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=load_checkpoint.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def save_checkpoint(**kwargs) -> None: - """Checkpoints all keyword arguments to Train as restorable state. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - for iter in range(100): - time.sleep(1) - train.save_checkpoint(epoch=iter) - - trainer = Trainer(backend="torch") - trainer.start() - trainer.run(train_func) - trainer.shutdown() - - Args: - **kwargs: Any key value pair to be checkpointed by Train. - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=save_checkpoint.__name__), - ) - - -@Deprecated(message=_get_deprecation_msg(is_docstring=True)) -def world_size() -> int: - """Get the current world size (i.e. total number of workers) for this run. - - .. code-block:: python - - import time - from ray import train - - def train_func(): - assert train.world_size() == 4 - - trainer = Trainer(backend="torch", num_workers=4) - trainer.start() - trainer.run(train_func) - trainer.shutdown() - """ - raise DeprecationWarning( - _get_deprecation_msg(is_docstring=False, fn_name=world_size.__name__), - )