Skip to content

Commit

Permalink
Introduce incompatible-types and enables spilling of CuPy arrays (#856)
Browse files Browse the repository at this point in the history
This PR replaces `ignore_types` introduced in #568 with `incompatible_types`, which is a list of types that `ProxifyHostFile` will unproxify on retrieval. This makes it possible to spill types we previously ignored completely such as  `cupy.ndarray`.

To mark a type incompatible, add it to the comma separated config value `"jit-unspill-incompatible"` or environment variable `DASK_JIT_UNSPILL_INCOMPATIBLE`.

The default value is: `DASK_JIT_UNSPILL_INCOMPATIBLE="cupy.ndarray"`

Closes #855

Notice, I have marked the PR `breaking` because the `DASK_JIT_UNSPILL_IGNORE` option has been removed.

cc. @VibhuJawa

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Ashwin Srinath (https://github.com/shwina)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #856
  • Loading branch information
madsbk authored Feb 15, 2022
1 parent fcd8d35 commit 98686bd
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 59 deletions.
2 changes: 1 addition & 1 deletion dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def concat(args, ignore_index=False):
if len(args) < 2:
return args[0]

return manager.proxify(dd_concat(args, ignore_index=ignore_index))
return manager.proxify(dd_concat(args, ignore_index=ignore_index))[0]

else:
concat = dd_concat
Expand Down
60 changes: 35 additions & 25 deletions dask_cuda/proxify_device_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,49 @@
import pydoc
from collections import defaultdict
from functools import partial
from typing import List, MutableMapping, TypeVar
from typing import List, MutableMapping, Optional, Tuple, TypeVar

import dask
from dask.utils import Dispatch

from .proxy_object import ProxyObject, asproxy

dispatch = Dispatch(name="proxify_device_objects")
ignore_types = None
incompatible_types: Optional[Tuple[type]] = None

T = TypeVar("T")


def _register_ignore_types():
"""Lazy register types that shouldn't be proxified
def _register_incompatible_types():
"""Lazy register types that ProxifyHostFile should unproxify on retrieval.
It reads the config key "jit-unspill-ignore" (DASK_JIT_UNSPILL_IGNORE),
which should be a comma seperated list of types to ignore. The default
value is:
DASK_JIT_UNSPILL_IGNORE="cupy.ndarray"
Notice, it is not possible to ignore types explicitly handled by this
module such as `cudf.DataFrame`, `cudf.Series`, and `cudf.Index`.
It reads the config key "jit-unspill-incompatible"
(DASK_JIT_UNSPILL_INCOMPATIBLE), which should be a comma seperated
list of types. The default value is:
DASK_JIT_UNSPILL_INCOMPATIBLE="cupy.ndarray"
"""
global ignore_types
if ignore_types is not None:
global incompatible_types
if incompatible_types is not None:
return # Only register once
else:
ignore_types = ()
incompatible_types = ()

ignores = dask.config.get("jit-unspill-ignore", "cupy.ndarray")
ignores = ignores.split(",")
incompatibles = dask.config.get("jit-unspill-incompatible", "cupy.ndarray")
incompatibles = incompatibles.split(",")

toplevels = defaultdict(set)
for path in ignores:
for path in incompatibles:
if path:
toplevel = path.split(".", maxsplit=1)[0].strip()
toplevels[toplevel].add(path.strip())

for toplevel, ignores in toplevels.items():

def f(paths):
global ignore_types
ignore_types = ignore_types + tuple(pydoc.locate(p) for p in paths)
global incompatible_types
incompatible_types = incompatible_types + tuple(
pydoc.locate(p) for p in paths
)

dispatch.register_lazy(toplevel, partial(f, ignores))

Expand Down Expand Up @@ -86,7 +85,7 @@ def proxify_device_objects(
ret: Any
A copy of `obj` where all CUDA device objects are wrapped in ProxyObject
"""
_register_ignore_types()
_register_incompatible_types()

if proxied_id_to_proxy is None:
proxied_id_to_proxy = {}
Expand All @@ -98,7 +97,9 @@ def proxify_device_objects(
return ret


def unproxify_device_objects(obj: T, skip_explicit_proxies: bool = False) -> T:
def unproxify_device_objects(
obj: T, skip_explicit_proxies: bool = False, only_incompatible_types: bool = False
) -> T:
""" Unproxify device objects
Search through `obj` and un-wraps all CUDA device objects.
Expand All @@ -109,6 +110,9 @@ def unproxify_device_objects(obj: T, skip_explicit_proxies: bool = False) -> T:
Object to search through or unproxify.
skip_explicit_proxies: bool
When True, skipping proxy objects marked as explicit proxies.
only_incompatible_types: bool
When True, ONLY unproxify incompatible type. The skip_explicit_proxies
argument is ignored.
Returns
-------
Expand All @@ -117,16 +121,22 @@ def unproxify_device_objects(obj: T, skip_explicit_proxies: bool = False) -> T:
"""
if isinstance(obj, dict):
return {
k: unproxify_device_objects(v, skip_explicit_proxies)
k: unproxify_device_objects(
v, skip_explicit_proxies, only_incompatible_types
)
for k, v in obj.items()
} # type: ignore
if isinstance(obj, (list, tuple, set, frozenset)):
return obj.__class__(
unproxify_device_objects(i, skip_explicit_proxies) for i in obj
unproxify_device_objects(i, skip_explicit_proxies, only_incompatible_types)
for i in obj
) # type: ignore
if isinstance(obj, ProxyObject):
pxy = obj._pxy_get(copy=True)
if not skip_explicit_proxies or not pxy.explicit_proxy:
if only_incompatible_types:
if incompatible_types and isinstance(obj, incompatible_types):
obj = obj._pxy_deserialize(maybe_evict=False, proxy_detail=pxy)
elif not skip_explicit_proxies or not pxy.explicit_proxy:
pxy.explicit_proxy = False
obj = obj._pxy_deserialize(maybe_evict=False, proxy_detail=pxy)
return obj
Expand Down Expand Up @@ -178,7 +188,7 @@ def proxify(obj, proxied_id_to_proxy, found_proxies, subclass=None):
def proxify_device_object_default(
obj, proxied_id_to_proxy, found_proxies, excl_proxies
):
if hasattr(obj, "__cuda_array_interface__") and not isinstance(obj, ignore_types):
if hasattr(obj, "__cuda_array_interface__"):
return proxify(obj, proxied_id_to_proxy, found_proxies)
return obj

Expand Down
31 changes: 23 additions & 8 deletions dask_cuda/proxify_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
serialize_and_split,
)

from dask_cuda.disk_io import disk_read, disk_write
from dask_cuda.get_device_memory_objects import DeviceMemoryId, get_device_memory_ids

from . import proxify_device_objects as pdo
from .disk_io import disk_read, disk_write
from .get_device_memory_objects import DeviceMemoryId, get_device_memory_ids
from .proxify_device_objects import proxify_device_objects, unproxify_device_objects
from .proxy_object import ProxyObject

Expand Down Expand Up @@ -319,8 +319,13 @@ def validate(self):
header, _ = pxy.obj
assert header["serializer"] == pxy.serializer

def proxify(self, obj: T) -> T:
"""Proxify `obj` and add found proxies to the Proxies collections"""
def proxify(self, obj: T) -> Tuple[T, bool]:
"""Proxify `obj` and add found proxies to the `Proxies` collections
Returns the proxified object and a boolean, which is `True` when one or
more incompatible-types were found.
"""
incompatible_type_found = False
with self.lock:
found_proxies: List[ProxyObject] = []
# In order detect already proxied object, proxify_device_objects()
Expand All @@ -336,8 +341,10 @@ def proxify(self, obj: T) -> T:
if not self.contains(id(p)):
pxy.manager = self
self.add(proxy=p, serializer=pxy.serializer)
if pdo.incompatible_types and isinstance(p, pdo.incompatible_types):
incompatible_type_found = True
self.maybe_evict()
return ret
return ret, incompatible_type_found

def evict(
self,
Expand Down Expand Up @@ -495,7 +502,10 @@ def __init__(
spill_on_demand: bool = None,
gds_spilling: bool = None,
):
self.store: Dict[Hashable, Any] = {}
# each value of self.store is a tuple containing the proxified
# object, as well as a boolean indicating whether any
# incompatible types were found when proxifying it
self.store: Dict[Hashable, Tuple[Any, bool]] = {}
self.manager = ProxyManager(device_memory_limit, memory_limit)
self.register_disk_spilling(local_directory, shared_filesystem, gds_spilling)
if compatibility_mode is None:
Expand Down Expand Up @@ -608,10 +618,15 @@ def __setitem__(self, key, value):

def __getitem__(self, key):
with self.lock:
ret = self.store[key]
ret, incompatible_type_found = self.store[key]
if self.compatibility_mode:
ret = unproxify_device_objects(ret, skip_explicit_proxies=True)
self.manager.maybe_evict()
elif incompatible_type_found:
# Notice, we only call `unproxify_device_objects()` when `key`
# contains incompatible types.
ret = unproxify_device_objects(ret, only_incompatible_types=True)
self.manager.maybe_evict()
return ret

def __delitem__(self, key):
Expand Down
46 changes: 21 additions & 25 deletions dask_cuda/tests/test_proxify_host_file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from typing import Iterable
from unittest.mock import patch

import numpy as np
import pytest
Expand All @@ -25,17 +26,17 @@
one_item_array = lambda: cupy.arange(1)
one_item_nbytes = one_item_array().nbytes

# While testing we want to proxify `cupy.ndarray` even though
# it is on the ignore_type list by default.
# While testing we don't want to unproxify `cupy.ndarray` even though
# it is on the incompatible_types list by default.
dask_cuda.proxify_device_objects.dispatch.dispatch(cupy.ndarray)
dask_cuda.proxify_device_objects.ignore_types = ()
dask_cuda.proxify_device_objects.incompatible_types = () # type: ignore


def is_proxies_equal(p1: Iterable[ProxyObject], p2: Iterable[ProxyObject]):
"""Check that two collections of proxies contains the same proxies (unordered)
In order to avoid deserializing proxy objects when comparing them,
this funcntion compares object IDs.
this function compares object IDs.
"""

ids1 = sorted([id(p) for p in p1])
Expand Down Expand Up @@ -294,7 +295,8 @@ def test_externals():
dhf = ProxifyHostFile(device_memory_limit=one_item_nbytes, memory_limit=1000)
dhf["k1"] = one_item_array()
k1 = dhf["k1"]
k2 = dhf.manager.proxify(one_item_array())
k2, incompatible_type_found = dhf.manager.proxify(one_item_array())
assert not incompatible_type_found
# `k2` isn't part of the store but still triggers spilling of `k1`
assert len(dhf) == 1
assert k1._pxy_get().is_serialized()
Expand Down Expand Up @@ -324,29 +326,23 @@ def test_externals():
assert dhf.manager._dev._mem_usage == 0


def test_proxify_device_objects_of_cupy_array():
"""Check that a proxied array behaves as a regular cupy array
@patch("dask_cuda.proxify_device_objects.incompatible_types", (cupy.ndarray,))
def test_incompatible_types():
"""Check that ProxifyHostFile unproxifies `cupy.ndarray` on retrieval
Notice, in this test we add `cupy.ndarray` to the ignore_types temporarily.
Notice, in this test we add `cupy.ndarray` to the incompatible_types temporarily.
"""
cupy = pytest.importorskip("cupy")
dask_cuda.proxify_device_objects.ignore_types = (cupy.ndarray,)
try:
# Make sure that equality works, which we use to test the other operators
org = cupy.arange(9).reshape((3, 3)) + 1
pxy = dask_cuda.proxify_device_objects.proxify_device_objects(
org.copy(), {}, []
)
assert (org == pxy).all()
assert (org + 1 != pxy).all()

for op in [cupy.dot]:
res = op(org, org)
assert (op(pxy, pxy) == res).all()
assert (op(org, pxy) == res).all()
assert (op(pxy, org) == res).all()
finally:
dask_cuda.proxify_device_objects.ignore_types = ()
cudf = pytest.importorskip("cudf")
dhf = ProxifyHostFile(device_memory_limit=100, memory_limit=100)

# We expect `dhf` to unproxify `a1` (but not `a2`) on retrieval
a1, a2 = (cupy.arange(9), cudf.Series([1, 2, 3]))
dhf["a"] = (a1, a2)
b1, b2 = dhf["a"]
assert a1 is b1
assert isinstance(b2, ProxyObject)
assert a2 is unproxy(b2)


@pytest.mark.parametrize("npartitions", [1, 2, 3])
Expand Down

0 comments on commit 98686bd

Please sign in to comment.