Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Remove Checkpoint._object_ref #31777

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 8 additions & 63 deletions python/ray/air/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
upload_to_uri,
)
from ray.air.constants import PREPROCESSOR_KEY, CHECKPOINT_ID_ATTR
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
from ray.util.annotations import DeveloperAPI, PublicAPI

if TYPE_CHECKING:
from ray.data.preprocessor import Preprocessor
Expand Down Expand Up @@ -116,7 +116,7 @@ class Checkpoint:

When converting between different checkpoint formats, it is guaranteed
that a full round trip of conversions (e.g. directory --> dict -->
obj ref --> directory) will recover the original checkpoint data.
--> directory) will recover the original checkpoint data.
There are no guarantees made about compatibility of intermediate
representations.

Expand All @@ -142,10 +142,7 @@ class Checkpoint:
same node or a node that also has access to the local data path (e.g.
on a shared file system like NFS).

Checkpoints pointing to object store references will keep the
object reference in tact - this means that these checkpoints cannot
be properly deserialized on other Ray clusters or outside a Ray
cluster. If you need persistence across clusters, use the ``to_uri()``
If you need persistence across clusters, use the ``to_uri()``
or ``to_directory()`` methods to persist your checkpoints to disk.

"""
Expand All @@ -165,7 +162,6 @@ def __init__(
local_path: Optional[Union[str, os.PathLike]] = None,
data_dict: Optional[dict] = None,
uri: Optional[str] = None,
obj_ref: Optional[ray.ObjectRef] = None,
):
# First, resolve file:// URIs to local paths
if uri:
Expand All @@ -175,7 +171,7 @@ def __init__(

# Only one data type can be set at any time
if local_path:
assert not data_dict and not uri and not obj_ref
assert not data_dict and not uri
if not isinstance(local_path, (str, os.PathLike)) or not os.path.exists(
local_path
):
Expand All @@ -191,21 +187,14 @@ def __init__(
f"instead."
)
elif data_dict:
assert not local_path and not uri and not obj_ref
assert not local_path and not uri
if not isinstance(data_dict, dict):
raise RuntimeError(
f"Cannot create checkpoint from dict as no "
f"dict was passed: {data_dict}"
)
elif obj_ref:
assert not local_path and not data_dict and not uri
if not isinstance(obj_ref, ray.ObjectRef):
raise RuntimeError(
f"Cannot create checkpoint from object ref as no "
f"object ref was passed: {obj_ref}"
)
elif uri:
assert not local_path and not data_dict and not obj_ref
assert not local_path and not data_dict
resolved = _get_external_path(uri)
if not resolved:
raise RuntimeError(
Expand All @@ -221,7 +210,6 @@ def __init__(
)
self._data_dict: Optional[Dict[str, Any]] = data_dict
self._uri: Optional[str] = uri
self._obj_ref: Optional[ray.ObjectRef] = obj_ref
self._override_preprocessor: Optional["Preprocessor"] = None

self._uuid = uuid.uuid4()
Expand Down Expand Up @@ -349,9 +337,6 @@ def to_dict(self) -> dict:
if self._data_dict:
# If the checkpoint data is already a dict, return
checkpoint_data = self._data_dict
elif self._obj_ref:
# If the checkpoint data is an object reference, resolve
checkpoint_data = ray.get(self._obj_ref)
elif self._local_path or self._uri:
# Else, checkpoint is either on FS or external storage
with self.as_directory() as local_path:
Expand Down Expand Up @@ -415,42 +400,6 @@ def to_dict(self) -> dict:
checkpoint_data[PREPROCESSOR_KEY] = self._override_preprocessor
return checkpoint_data

@classmethod
@Deprecated(
message="To restore a checkpoint from a remote object ref, call "
"`ray.get(obj_ref)` instead."
)
def from_object_ref(cls, obj_ref: ray.ObjectRef) -> "Checkpoint":
"""Create checkpoint object from object reference.

Args:
obj_ref: ObjectRef pointing to checkpoint data.

Returns:
Checkpoint: checkpoint object.
"""
raise DeprecationWarning(
"`from_object_ref` is deprecated and will be removed in a future Ray "
"version. To restore a Checkpoint from a remote object ref, call "
"`ray.get(obj_ref)` instead.",
)

@Deprecated(
message="To store the checkpoint in the Ray object store, call `ray.put(ckpt)` "
"instead of `ckpt.to_object_ref()`."
)
def to_object_ref(self) -> ray.ObjectRef:
"""Return checkpoint data as object reference.

Returns:
ray.ObjectRef: ObjectRef pointing to checkpoint data.
"""
raise DeprecationWarning(
"`to_object_ref` is deprecated and will be removed in a future Ray "
"version. To store the checkpoint in the Ray object store, call "
"`ray.put(ckpt)` instead of `ckpt.to_object_ref()`.",
)

@classmethod
def from_directory(cls, path: Union[str, os.PathLike]) -> "Checkpoint":
"""Create checkpoint object from directory.
Expand Down Expand Up @@ -498,7 +447,6 @@ def from_checkpoint(cls, other: "Checkpoint") -> "Checkpoint":
local_path=other._local_path,
data_dict=other._data_dict,
uri=other._uri,
obj_ref=other._obj_ref,
)
new_checkpoint._copy_metadata_attrs_from(other)
return new_checkpoint
Expand Down Expand Up @@ -533,8 +481,7 @@ def _save_checkpoint_metadata_in_directory(self, path: str) -> None:
pickle.dump(self._metadata, file)

def _to_directory(self, path: str, move_instead_of_copy: bool = False) -> None:
if self._data_dict or self._obj_ref:
# This is a object ref or dict
if self._data_dict:
data_dict = self.to_dict()
if _FS_CHECKPOINT_KEY in data_dict:
for key in data_dict.keys():
Expand Down Expand Up @@ -777,7 +724,7 @@ def get_internal_representation(
objects for equality or to access the underlying data storage.

The returned type is a string and one of
``["local_path", "data_dict", "uri", "object_ref"]``.
``["local_path", "data_dict", "uri"]``.

The data is the respective data value.

Expand All @@ -793,8 +740,6 @@ def get_internal_representation(
return "data_dict", self._data_dict
elif self._uri:
return "uri", self._uri
elif self._obj_ref:
return "object_ref", self._obj_ref
else:
raise RuntimeError(
"Cannot get internal representation of empty checkpoint."
Expand Down
12 changes: 4 additions & 8 deletions python/ray/data/preprocessors/batch_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,19 @@ def __init__(
Union[np.ndarray, Dict[str, np.ndarray]],
],
],
batch_format: Optional[BatchFormat] = None,
batch_format: Optional[BatchFormat],
batch_size: Optional[Union[int, Literal["default"]]] = "default",
# TODO: Make batch_format required from user
# TODO: Introduce a "zero_copy" format
# TODO: We should reach consistency of args between BatchMapper and map_batches.
):
if not batch_format:
raise DeprecationWarning(
"batch_format is a required argument for BatchMapper from Ray 2.1."
"You must specify either 'pandas' or 'numpy' batch format."
)

if batch_format not in [
BatchFormat.PANDAS,
BatchFormat.NUMPY,
]:
raise ValueError("BatchMapper only supports pandas and numpy batch format.")
raise ValueError(
"BatchMapper only supports 'pandas' or 'numpy' batch format."
)

self.batch_format = batch_format
self.batch_size = batch_size
Expand Down
16 changes: 0 additions & 16 deletions python/ray/train/__init__.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
from ray._private.usage import usage_lib
from ray.train.backend import BackendConfig
from ray.train.constants import TRAIN_DATASET_KEY
from ray.train.train_loop_utils import (
get_dataset_shard,
load_checkpoint,
local_rank,
report,
save_checkpoint,
world_rank,
world_size,
)
from ray.train.trainer import TrainingIterator


usage_lib.record_library_usage("train")

__all__ = [
"BackendConfig",
"get_dataset_shard",
"load_checkpoint",
"local_rank",
"report",
"save_checkpoint",
"TrainingIterator",
"world_rank",
"world_size",
"TRAIN_DATASET_KEY",
]
15 changes: 0 additions & 15 deletions python/ray/train/torch/train_loop_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def prepare_model(
move_to_device: bool = True,
parallel_strategy: Optional[str] = "ddp",
parallel_strategy_kwargs: Optional[Dict[str, Any]] = None,
# Deprecated args.
wrap_ddp: bool = False,
ddp_kwargs: Optional[Dict[str, Any]] = None,
) -> torch.nn.Module:
"""Prepares the model for distributed execution.

Expand All @@ -76,18 +73,6 @@ def prepare_model(
or "fsdp", respectively.
"""

if wrap_ddp:
raise DeprecationWarning(
"The `wrap_ddp` argument is deprecated as of Ray 2.1. Use the "
"`parallel_strategy` argument instead."
)

if ddp_kwargs:
raise DeprecationWarning(
"The `ddp_kwargs` argument is deprecated as of Ray 2.1. Use the "
"`parallel_strategy_kwargs` arg instead."
)

if parallel_strategy == "fsdp" and FullyShardedDataParallel is None:
raise ImportError(
"FullyShardedDataParallel requires torch>=1.11.0. "
Expand Down
Loading