Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask-image imread v0.5.0 not working with dask distributed Client & napari #194

Closed
GenevieveBuckley opened this issue Feb 25, 2021 · 61 comments

Comments

@GenevieveBuckley
Copy link
Collaborator

GenevieveBuckley commented Feb 25, 2021

@kpasko made a bug report napari/napari#2304 but it turns out this is a problem caused by dask-image. I've copied the contents of the report into this issue (sadly I'm unable to transfer issues between different organisations).

What happened:

TypeError: can not serialize 'function' object

In distributed/client.py line 2635
futures = self._graph_to_futures

What you expected to happen:

successful image viewing

Minimal Complete Verifiable Example:

(Edited)

from dask.distributed import Client
import napari
from dask_image.imread import imread

client = Client()
data = imread('./*.tif')
napari.view_image(data)

Anything else we need to know?:
Works fine when not initializing client, i.e.

from dask.distributed import Client
import napari
from dask_image.imread import imread

data = imread('./*.tif')
napari.view_image(data)

works as expected

Environment:

  • Napari/Dask version:
    dask 2021.2.0 pyhd8ed1ab_0 conda-forge
    dask-core 2021.2.0 pyhd8ed1ab_0 conda-forge
    dask-image 0.5.0 pyh44b312d_0 conda-forge
    distributed 2021.2.0 py39h6e9494a_0 conda-forge
    napari 0.4.5 pyhd8ed1ab_0 conda-forge
    napari-console 0.0.3 pyhd8ed1ab_0 conda-forge
    napari-plugin-engine 0.1.9 py39h6e9494a_1 conda-forge
    napari-svg 0.1.4 py_0 conda-forge

  • Python version:
    python 3.9.2 h2502468_0_cpython conda-forge

  • Operating System: OS X 11.2.1

  • Install method (conda, pip, source): conda

@GenevieveBuckley
Copy link
Collaborator Author

Talley says:

interesting. This does appear to be at least partly related to the interplay between napari and dask-image itself... If I directly pass a dask array, I get no error:

from dask.distributed import Client
import napari
import dask.array as da

client = Client()
data = da.ones((64,64))
napari.view_image(data)

also, if I cast the dask_image to numpy prior to passing to napari, it also seems to work:

from dask.distributed import Client
import napari
from dask_image.imread import imread
import numpy as np

client = Client()
data = imread('./myFiles/*.tif')
napari.view_image(np.array(data))

@GenevieveBuckley
Copy link
Collaborator Author

From @kpasko

here's the full trace

image

@GenevieveBuckley
Copy link
Collaborator Author

@jakirkham adds this:

Does this happen with an older version of Dask-Image? I think there were some changes to imread in the recent release

In particular would be good to test 0.4.0 to see if this works there as 0.5.0 started using da.map_blocks ( #165 )

And from @GenevieveBuckley there's this:

Can confirm, this problem was introduced with dask-image version 0.5.0

You can use either dask-image version 0.4.0, or dask.array.image.imread() and not see the same problem.

from dask.distributed import Client
import napari
from dask.array.image import imread

client = Client()
data = imread('./myFiles/*.tif')
napari.view_image(data)

@GenevieveBuckley
Copy link
Collaborator Author

cc @jrbourbeau (if you still want some example data, you can download some here, but honestly you can use any single image to try this out - .png, .jpg, .tiff, don't think it matters).

I wonder if #182 might be the issue

One problem with the current implementation that I noticed is that when calling dask_image.imread.imread with a file pattern such as im_*.tif, for each tile that is loaded pims.open is called on the entire file pattern. This then leads to many unnecessary instantiations of pims.ImageSequenceNDs producing a large overhead.

In this proposed fix I use glob to match filenames and frames to call pims.open only on the files that are actually being loaded.

I did not consider what might happen with this in a distributed context (oops). So that's probably a good first place to look.

cc @m-albert

@jakirkham
Copy link
Member

Would just add it’s possible the issue tracks back to Dask. Though I don’t think we’ve isolated it yet. It might be worth trying to reproduce using just da.map_blocks (assuming we’ve narrowed it down to that change in dask-image).

@jni
Copy link
Contributor

jni commented Feb 25, 2021

This is all very fascinating. 👀

@jni
Copy link
Contributor

jni commented Feb 25, 2021

So, the plot thickens... I can't reproduce. @GenevieveBuckley what is your cloudpickle version? I just installed it at 1.6 and I was able to run your example without issues. (though it should be from dask_image.imread import imread rather than import dask_image.imread as imread, maybe fix the MRE at the top? I also recommend removing the myFiles from the path since it's trivial to run the example in a directory with some tifs, less so to have that directory structure.)

from dask.distributed import Client
import napari
from dask_image.imread import imread

client = Client()
data = imread('./*.tif')
napari.view_image(data)

@GenevieveBuckley
Copy link
Collaborator Author

Ok, I will edit the example up top.

I'm also think there's probably no need to involve napari in the example at all, it's entirely possible that calling data.compute() will trigger the same problem. (Haven't checked this yet)

@jni
Copy link
Contributor

jni commented Feb 26, 2021

I'm also think there's probably no need to involve napari in the example at all, it's entirely possible that calling data.compute() will trigger the same problem. (Haven't checked this yet)

Well, @tlambert03's example (using np.asarray) would seem to disprove that... But I haven't checked it locally.

@GenevieveBuckley
Copy link
Collaborator Author

I'm also think there's probably no need to involve napari in the example at all, it's entirely possible that calling data.compute() will trigger the same problem. (Haven't checked this yet)

Well, @tlambert03's example (using np.asarray) would seem to disprove that... But I haven't checked it locally.

Oh yeah, I'd almost forgotten about that. It turns out it's even more interesting than I'd thought.

ipython --gui=qt

from dask.distributed import Client
import napari
from dask_image.imread import imread

client = Client()
data = imread('*.tif')
napari.view_image(data)  # you get an error

data.compute()  # this works fine
napari.view_image(data)  # works fine now
Error message:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-9-4d014add55bf> in <module>
----> 1 napari.view_image(data)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/view_layers.py in view_image(data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale, title, ndisplay, order, axis_labels, show)
      7 and the ``Viewer.add_<layer_type>`` methods.  The final generated functions
      8 follow this pattern
----> 9 (where <layer_type> is replaced with one of the layer types):
     10 
     11     def view_<layer_type>(*args, **kwargs):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/components/viewer_model.py in add_image(self, data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
    674                         "did you mean to specify a 'channel_axis'? "
    675                     )
--> 676             layer = image_class(data, **kwargs)
    677             self.layers.append(layer)
    678 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in __init__(self, data, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
    274 
    275         # Trigger generation of view slice and thumbnail
--> 276         self._update_dims()
    277 
    278     def _new_empty_slice(self):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in _update_dims(self, event)
    528         self._ndim = ndim
    529 
--> 530         self.refresh()
    531         self._value = self.get_value(self.position, world=True)
    532 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in refresh(self, event)
    938         """Refresh all layer data based on current view slice."""
    939         if self.visible:
--> 940             self.set_view_slice()
    941             self.events.set_data()
    942             self._update_thumbnail()

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in set_view_slice(self)
    798     def set_view_slice(self):
    799         with self.dask_optimized_slicing():
--> 800             self._set_view_slice()
    801 
    802     @abstractmethod

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _set_view_slice(self)
    611         # Load our images, might be sync or async.
    612         data = SliceDataClass(self, image_indices, image, thumbnail_source)
--> 613         self._load_slice(data)
    614 
    615     def _load_slice(self, data: SliceDataClass):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _load_slice(self, data)
    620         data : Slice
    621         """
--> 622         if self._slice.load(data):
    623             # The load was synchronous.
    624             self._on_data_loaded(data, sync=True)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice.py in load(self, data)
    117         """
    118         self.loaded = False  # False until self._on_loaded is calls
--> 119         return self.loader.load(data)
    120 
    121     def on_loaded(self, data: ImageSliceData) -> bool:

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_loader.py in load(self, data)
     20             True if load happened synchronously.
     21         """
---> 22         data.load_sync()
     23         return True
     24 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice_data.py in load_sync(self)
     38     def load_sync(self) -> None:
     39         """Call asarray on our images to load them."""
---> 40         self.image = np.asarray(self.image)
     41 
     42         if self.thumbnail_source is not None:

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1446 
   1447     def __array__(self, dtype=None, **kwargs):
-> 1448         x = self.compute()
   1449         if dtype and x.dtype != dtype:
   1450             x = x.astype(dtype)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    279         dask.base.compute
    280         """
--> 281         (result,) = compute(self, traverse=False, **kwargs)
    282         return result
    283 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    561         postcomputes.append(x.__dask_postcompute__())
    562 
--> 563     results = schedule(dsk, keys, **kwargs)
    564     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    565 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2633         Client.compute : Compute asynchronous collections
   2634         """
-> 2635         futures = self._graph_to_futures(
   2636             dsk,
   2637             keys=set(flatten([keys])),

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2541                 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
   2542 
-> 2543             dsk = highlevelgraph_pack(dsk, self, keyset)
   2544 
   2545             annotations = {}

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/highlevelgraph.py in highlevelgraph_pack(hlg, client, client_keys)
    122             }
    123         )
--> 124     return dumps_msgpack({"layers": layers})
    125 
    126 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    182     """
    183     header = {}
--> 184     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
    185 
    186     fmt, payload = maybe_compress(payload, compression=compression)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

TypeError: can not serialize 'function' object

@GenevieveBuckley
Copy link
Collaborator Author

GenevieveBuckley commented Feb 26, 2021

Here's the conda list of my environment:

Click to expand!
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       1_gnu    conda-forge
alabaster                 0.7.12                   pypi_0    pypi
appdirs                   1.4.4                    pypi_0    pypi
babel                     2.9.0                    pypi_0    pypi
backcall                  0.2.0              pyh9f0ad1d_0    conda-forge
backports                 1.0                        py_2    conda-forge
backports.functools_lru_cache 1.6.1                      py_0    conda-forge
ca-certificates           2020.12.5            ha878542_0    conda-forge
cachey                    0.2.1                    pypi_0    pypi
certifi                   2020.12.5        py38h578d9bd_1    conda-forge
chardet                   4.0.0                    pypi_0    pypi
click                     7.1.2                    pypi_0    pypi
cloudpickle               1.6.0                    pypi_0    pypi
cycler                    0.10.0                   pypi_0    pypi
dask                      2021.2.0                 pypi_0    pypi
dask-image                0.5.0                    pypi_0    pypi
decorator                 4.4.2                      py_0    conda-forge
distributed               2021.2.0                 pypi_0    pypi
docstring-parser          0.7.3                    pypi_0    pypi
docutils                  0.16                     pypi_0    pypi
freetype-py               2.2.0                    pypi_0    pypi
heapdict                  1.0.1                    pypi_0    pypi
idna                      2.10                     pypi_0    pypi
imageio                   2.9.0                    pypi_0    pypi
imagesize                 1.2.0                    pypi_0    pypi
ipykernel                 5.5.0                    pypi_0    pypi
ipython                   7.20.0           py38h81c977d_2    conda-forge
ipython_genutils          0.2.0                      py_1    conda-forge
jedi                      0.18.0           py38h578d9bd_2    conda-forge
jinja2                    2.11.3                   pypi_0    pypi
jupyter-client            6.1.11                   pypi_0    pypi
jupyter-core              4.7.1                    pypi_0    pypi
kiwisolver                1.3.1                    pypi_0    pypi
ld_impl_linux-64          2.35.1               hea4e1c9_2    conda-forge
libffi                    3.3                  h58526e2_2    conda-forge
libgcc-ng                 9.3.0               h2828fa1_18    conda-forge
libgomp                   9.3.0               h2828fa1_18    conda-forge
libstdcxx-ng              9.3.0               h6de172a_18    conda-forge
magicgui                  0.2.6                    pypi_0    pypi
markupsafe                1.1.1                    pypi_0    pypi
matplotlib                3.3.4                    pypi_0    pypi
msgpack                   1.0.2                    pypi_0    pypi
napari                    0.4.5                    pypi_0    pypi
napari-console            0.0.3                    pypi_0    pypi
napari-plugin-engine      0.1.9                    pypi_0    pypi
napari-svg                0.1.4                    pypi_0    pypi
ncurses                   6.2                  h58526e2_4    conda-forge
networkx                  2.5                      pypi_0    pypi
numpy                     1.20.1                   pypi_0    pypi
numpydoc                  1.1.0                    pypi_0    pypi
openssl                   1.1.1j               h7f98852_0    conda-forge
packaging                 20.9                     pypi_0    pypi
parso                     0.8.1              pyhd8ed1ab_0    conda-forge
pexpect                   4.8.0              pyh9f0ad1d_2    conda-forge
pickleshare               0.7.5                   py_1003    conda-forge
pillow                    8.1.0                    pypi_0    pypi
pims                      0.5                      pypi_0    pypi
pip                       21.0.1             pyhd8ed1ab_0    conda-forge
prompt-toolkit            3.0.16             pyha770c72_0    conda-forge
psutil                    5.8.0                    pypi_0    pypi
ptyprocess                0.7.0              pyhd3deb0d_0    conda-forge
pydantic                  1.7.3                    pypi_0    pypi
pygments                  2.8.0              pyhd8ed1ab_0    conda-forge
pyopengl                  3.1.5                    pypi_0    pypi
pyparsing                 2.4.7                    pypi_0    pypi
pyqt5                     5.15.3                   pypi_0    pypi
pyqt5-qt                  5.15.2                   pypi_0    pypi
pyqt5-sip                 12.8.1                   pypi_0    pypi
python                    3.8.8           hffdb5ce_0_cpython    conda-forge
python-dateutil           2.8.1                    pypi_0    pypi
python_abi                3.8                      1_cp38    conda-forge
pytz                      2021.1                   pypi_0    pypi
pywavelets                1.1.1                    pypi_0    pypi
pyyaml                    5.4.1                    pypi_0    pypi
pyzmq                     22.0.3                   pypi_0    pypi
qtconsole                 5.0.2                    pypi_0    pypi
qtpy                      1.9.0                    pypi_0    pypi
readline                  8.0                  he28a2e2_2    conda-forge
requests                  2.25.1                   pypi_0    pypi
scikit-image              0.18.1                   pypi_0    pypi
scipy                     1.6.1                    pypi_0    pypi
setuptools                49.6.0           py38h578d9bd_3    conda-forge
six                       1.15.0                   pypi_0    pypi
slicerator                1.0.0                    pypi_0    pypi
snowballstemmer           2.1.0                    pypi_0    pypi
sortedcontainers          2.3.0                    pypi_0    pypi
sphinx                    3.5.1                    pypi_0    pypi
sphinxcontrib-applehelp   1.0.2                    pypi_0    pypi
sphinxcontrib-devhelp     1.0.2                    pypi_0    pypi
sphinxcontrib-htmlhelp    1.0.3                    pypi_0    pypi
sphinxcontrib-jsmath      1.0.1                    pypi_0    pypi
sphinxcontrib-qthelp      1.0.3                    pypi_0    pypi
sphinxcontrib-serializinghtml 1.1.4                    pypi_0    pypi
sqlite                    3.34.0               h74cdb3f_0    conda-forge
tblib                     1.7.0                    pypi_0    pypi
tifffile                  2021.2.1                 pypi_0    pypi
tk                        8.6.10               h21135ba_1    conda-forge
toolz                     0.11.1                   pypi_0    pypi
tornado                   6.1                      pypi_0    pypi
traitlets                 5.0.5                      py_0    conda-forge
typing-extensions         3.7.4.3                  pypi_0    pypi
urllib3                   1.26.3                   pypi_0    pypi
vispy                     0.6.6                    pypi_0    pypi
wcwidth                   0.2.5              pyh9f0ad1d_2    conda-forge
wheel                     0.36.2             pyhd3deb0d_0    conda-forge
wrapt                     1.12.1                   pypi_0    pypi
xz                        5.2.5                h516909a_1    conda-forge
zict                      2.0.0                    pypi_0    pypi
zlib                      1.2.11            h516909a_1010    conda-forge

I made the env like this:

conda create -n daskimage-issue194 python=3.8 pip ipython
conda activate daskimage-issue194
pip install napari[all]
pip install dask-image==0.5.0
pip install dask[distributed]

@jakirkham
Copy link
Member

Just to clarify should this...

data.compute()  # this works fine

...be this?

data = data.compute()  # this works fine

Or does it occur without that assignment?

@GenevieveBuckley
Copy link
Collaborator Author

GenevieveBuckley commented Feb 26, 2021

Just to clarify should this...

data.compute()  # this works fine

...be this?

data = data.compute()  # this works fine

Or does it occur without that assignment?

No, that's the crazy thing - I did not assign the result back to data.

It seems that any kind of call that results in the dask array being computed (I tried this with np.array(data) and data.compute()) somehow magically fixes things with napari the next time we look at the dask array (even though it should just be computing the task graph again)

@jakirkham
Copy link
Member

I was afraid you might say that 😅

Is Napari doing some kind of caching these days?

@GenevieveBuckley
Copy link
Collaborator Author

Critically, dask.array.image.imread() does not show the same problem - https://github.com/dask/dask/blob/master/dask/array/image.py

Would we lose much functionality if that became the canonical way to open images? Perhaps we could adapt it to use pims instead of skimage by default.

This one wasn't on the list of imread functions profiled here. I'd say profiling the performance here might be a good next step. If it performs decently, maybe we move in that direction.

@jakirkham
Copy link
Member

Well I still suspect there is an underlying Dask issue that needs to be fixed. The error that we are seeing has to do with the serialization of the Dask task graph, which has seen changes in recent releases. I think if we can narrow it down to something using da.map_blocks this will be easier to demonstrate as a Dask issue and we can get the right people to look at that

@jni
Copy link
Contributor

jni commented Feb 26, 2021

@jakirkham yes napari sets the dask cache to 1/10th of RAM on import. Yes I know it's bad, I've tried to argue to change that. 😅

@jakirkham
Copy link
Member

jakirkham commented Feb 26, 2021

Ah ok. So the call to __array__ (which is what the view_image call does) after .compute() is just retrieving the cached NumPy array previously computed. That makes more sense

@jni
Copy link
Contributor

jni commented Feb 26, 2021

idea: I was trying to figure out why the view_image call might be different from the np.asarray() call: view_image slices then computes, while np.asarray() computes then slices. @GenevieveBuckley can you try np.asarray(data[0]), no napari?

@tlambert03
Copy link

@jakirkham yes napari sets the dask cache to 1/10th of RAM on import. Yes I know it's bad, I've tried to argue to change that. 😅

Wait which part is bad? I don't remember any arguments about this (and is it really only 1/10 now?)

@m-albert
Copy link
Collaborator

m-albert commented Feb 26, 2021

Hi guys, I found the following:

Actually, the latest commit #182 fixes the reported problem when using dask_image.imread together with napari. Namely, the PR slightly changed the way map_blocks is used.

Before: da.map_blocks(func...
After (works): x.map_blocks(func...

I checked this by changing the code from the last broken commit 5afde9a from

    a = dask.array.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )
...
def _map_read_frame(x, block_info=None, **kwargs):

into

    import numpy as np
    a0 = dask.array.from_array(np.ones((2, 5, 5)), chunks=(1, 5, 5))

    a = a0.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )
...
def _map_read_frame(x, block_info=None, **kwargs):

and confirming that the latter indeed resolves the issue (using two random input images).

So, dask_image and napari work together as expected in the current main, which however is one commit ahead of the 0.5 release.

Regarding the root of the problem, I think @jakirkham is right that there could be an underlying dask problem. According to the documentation of dask.array.core.map_blocks it seems intended to be able to use map_blocks without input array. However, I found the following problem:

import dask.array as da
import numpy as np

def func(block_info=None):
    return 1

da.map_blocks(func, chunks=((4, 4),), dtype=np.float_).compute()[0]

works as expected, however

da.map_blocks(func, chunks=((4, 4),), dtype=np.float_)[0].compute()

throws an error "TypeError: 'int' object is not subscriptable". This does not reproduce the "TypeError: cannot serialize 'function' object" error occuring here, but I'd say it's clearly unintended behaviour. It also relates to @jni's comment that napari.view_image slices and then computes as opposed to when first calling .compute(), which @GenevieveBuckley found can avoid the error.

@jakirkham
Copy link
Member

Thank you Marvin! 😄 Could you please file the Dask MRE as a Dask issue? 🙂 We can then follow up with people there

@m-albert
Copy link
Collaborator

@jakirkham 😅 Sorry I just realised the dask part of my comment is completely wrong, as func just returns the wrong shape, which then leads to the "'int' object is not subscriptable" error.

In conclusion this doesn't hint to the underlying dask or distributed problem...

@jakirkham
Copy link
Member

No worries 🙂 Yeah then it seems we still have more digging to do

@jni
Copy link
Contributor

jni commented Feb 27, 2021

@tlambert03

Wait which part is bad? I don't remember any arguments about this

Well, it seems to me that modifying the behaviour of a popular third party library (dask) just by importing is undesirable. We see above that just importing napari altered the behaviour of dask during diagnostics. In general, imho there should be a visible function call to alter the dask caching behaviour.

At any rate, we discussed this in various group meetings, just never seriously. The basic issue I've run into is that the dask cache is good-ish when exploring, but bad when using "play", because you fill up the cache and then spend the whole time putting stuff into the cache queue and deleting it from the back of it.

(and is it really only 1/10 now?)

napari/napari#1666

@tlambert03
Copy link

totally agree. let's change it!

@jni
Copy link
Contributor

jni commented Mar 8, 2021

@GenevieveBuckley it shouldn't be that many commits to check which one did it? ie can you do git-bisect to find the commit that fixes things? (I would do this but as mentioned I couldn't reproduce this issue)

@GenevieveBuckley
Copy link
Collaborator Author

@GenevieveBuckley it shouldn't be that many commits to check which one did it? ie can you do git-bisect to find the commit that fixes things? (I would do this but as mentioned I couldn't reproduce this issue)

Do we have any idea why you couldn't reproduce this? I'm pretty sure you're running the same as me, Ubuntu 20.04

@jni
Copy link
Contributor

jni commented Mar 9, 2021

No, my laptop remains on 18.04. But anyway I doubt it's an OS issue. I tried various combinations of cloudpickle/dask/dask-image and couldn't get the error... I did not try the complete env at the top of this issue, might try that this arvo.

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

@jni

it shouldn't be that many commits to check which one did it? ie can you do git-bisect to find the commit that fixes things? (I would do this but as mentioned I couldn't reproduce this issue)

The breaking commit is 17ec4c2 which implements map_blocks for imread:

    ...
    a = dask.array.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

    return a


def _map_read_frame(block_info=None, **kwargs):

    i, j = block_info[None]['array-location'][0]

    return _utils._read_frame(i=slice(i, j), **kwargs)

After the latest commit 91fe6e1, the problem doesn't occur anymore. This commit changes the way map_blocks is implemented in imread:

    ...
    # place source filenames into dask array
    filenames = sorted(glob.glob(sfname))  # pims also does this
    if len(filenames) > 1:
        ar = dask.array.from_array(filenames, chunks=(nframes,))
        multiple_files = True
    else:
        ar = dask.array.from_array(filenames * shape[0], chunks=(nframes,))
        multiple_files = False

    # read in data using encoded filenames
    a = ar.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        multiple_files=multiple_files,
        new_axis=list(range(1, len(shape))),
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

    return a


def _map_read_frame(x, multiple_files, block_info=None, **kwargs):

    fn = x[0]  # get filename from input chunk

    if multiple_files:
        i, j = 0, 1
    else:
        i, j = block_info[None]['array-location'][0]

    return _utils._read_frame(fn=fn, i=slice(i, j), **kwargs)

These code snippets include all relevant changed lines. Interestingly, the problem seems to be related to the differences in the use of map_blocks. Namely, the problematic commit calls dask.array.map_blocks while the fixing commit uses x.map_blocks, where x is a dask array. I've tested that adding a dummy array in the problematic commit solves the problem:

    import dask.array as da
    import numpy as np
    x = da.from_array(np.zeros(shape, dtype=dtype),
                      chunks=dask.array.core.normalize_chunks((nframes,) + shape[1:], shape),)

    a = x.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

However, in my understanding using map_blocks as dask.array.map_blocks should be intended behaviour, as an example for this is also included in the docstring of map_blocks. Also, using dask.array.map_blocks in a minimal example in combination with a distributed client and the napari viewer doesn't seem to break anything:

import dask.array as da
import numpy as np

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func():
    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)
napari.view_image(xm)

Something else to consider is the observation by @GenevieveBuckley that calling data.compute() circumvents the problem. So my guess at this point would be that there's a dask or distributed problem as @jakirkham suggested, potentially related to new graph functionality, potentially occurring when napari first slices and then computes, as @jni suggested.

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

So my guess at this point would be that there's a dask or distributed problem as @jakirkham suggested, potentially related to new graph functionality

Okay, using dask-image 0.5.0 (in which the reported error occurs) fails when using the current versions of dask and dask.distributed.

However, when using pip install dask==2.30.0 distributed==2.30.0, the problem disappears! And it comes back when using the more recent pip install dask==2020.12.0 distributed==2020.12.0. So there must be something in dask and/or dask.distributed version 2020.12.0 that introduces this problem.

EDIT: 2021.03.01 still shows the problem.

See https://docs.dask.org/en/latest/changelog.html#id5 and https://distributed.dask.org/en/latest/changelog.html#id5.

It could be something related to the point

Introduced new HighLevelGraph layer objects including BasicLayer, Blockwise, BlockwiseIO, ShuffleLayer, and more.

as the traceback shows that this line fails:
dsk = highlevelgraph_pack(dsk, self, keyset)
and downstream
dumps_msgpack({"layers": layers})

@jakirkham
Copy link
Member

Does it occur with 2021.03.0?

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Does it occur with 2021.03.0?

Yes it does.

@jakirkham
Copy link
Member

Thanks Marvin! 😄

Sorry I may have missed this, what is the reproducer now? Were we able to do this without Napari (like just calling .compute())

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Sorry I may have missed this, what is the reproducer now? Were we able to do this without Napari (like just calling .compute())

Couldn't reproduce the issue with a different/simpler example than the one reported here yet...

@jakirkham
Copy link
Member

Do we know what view_image is doing here? Maybe it's something particular like slicing, computing extents, or similar that is causing the issue

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Do we know what view_image is doing here? Maybe it's something particular like slicing, computing extents, or similar that is causing the issue

This line in napari/layers/image/_image_slice_data.py fails

     38     def load_sync(self) -> None:
     39         """Call asarray on our images to load them."""
---> 40         self.image = np.asarray(self.image)

Strangely, when rerunning it in the interactive debugger it works fine.

@jakirkham
Copy link
Member

Yeah np.asarray(...) is the same as calling .compute()

@jakirkham
Copy link
Member

Though what is interesting is this comes from _image_slice_data. Do we know how the data was sliced? Was there anything else done to it? That may help us uncover the missing step that triggers the bug

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Though what is interesting is this comes from _image_slice_data. Do we know how the data was sliced? Was there anything else done to it? That may help us uncover the missing step that triggers the bug

Interestingly, if I run %debug after the error occurs, self.image.compute() in napari/layers/image/_image_slice_data.py gives a result.

However, if I set a pdb.set_trace() before self.image = np.asarray(self.image) in napari/layers/image/_image_slice_data.py, self.image.compute() fails with the known error. Wonder whether this might be related to the interplay between dask and pdb

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Or that might hint towards the state of the dask array being changed at some point within the first failed execution of compute, in such a way that the second time succeeds... (?). Okay I'll leave it there 😅

@jakirkham
Copy link
Member

Interesting. Thanks for exploring this! 😄

Are you able to see what the Dask task graph looks like before calling .compute()? Maybe that will give some more clues

Agree there is something magical going on here that we don't fully comprehend yet

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Btw, I found another hook of some type. This reproduces the problem without dask-image, but using napari.

%gui qt
from dask.distributed import Client
import napari
from dask_image.imread import imread
client = Client()

import dask.array as da
import numpy as np

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func(block_info=None):
    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)

Now, replacing

def func(block_info=None):

by

def func():

makes the error disappear.

As if dask couldn't deal with the provided function properly in case there's the block_info argument without the dask array being passed.

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Are you able to see what the Dask task graph looks like before calling .compute()?

In this failing line dumps_msgpack({"layers": layers}) it seems the entire layers object is packable apart from layers[1]['state']['indices'] which contains func from the example.

ipdb> p layers[1]['state']['indices']
[<function func at 0x150d21a60>, None, (<class 'tuple'>, ['block_info']), None, 'block-info-func-b78f8575db855a3a3f1010f0ef59e206', ('.0', '.1', '.2')]

@jakirkham
Copy link
Member

We did make some changes to how MsgPack is used in PR ( dask/distributed#4531 ), which was merged after the recent release. Curious if that still encounters the issue as well

@m-albert
Copy link
Collaborator

m-albert commented Mar 9, 2021

Okay I narrowed it down to (without napari):

import dask.array as da
import numpy as np

from dask.distributed import Client
import napari
client = Client()

xn = np.random.randint(0, 100, (2, 4, 4))
xd = da.from_array(xn, chunks=(1, 2, 2))

# fails
# def func(block_info=None):
#     return np.random.randint(0, 100, (1, 2, 2))

# works
def func():
    return np.random.randint(0, 100, (1, 2, 2))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)

from dask.core import flatten
keyset = set(flatten(xm.__dask_keys__()))
xm.dask.__dask_distributed_pack__(client, keyset)

Same error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-93-163cc014c931> in <module>
     21 from dask.core import flatten
     22 keyset = set(flatten(xm.__dask_keys__()))
---> 23 xm.dask.__dask_distributed_pack__(client, keyset)

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys)
    942                 }
    943             )
--> 944         return dumps_msgpack({"layers": layers})
    945 
    946     @staticmethod

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    161     """
    162     header = {}
--> 163     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
    164 
    165     fmt, payload = maybe_compress(payload, compression=compression)

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

TypeError: can not serialize 'function' object

@jakirkham
Copy link
Member

Well done! 😄 👏

Can you please file this as a new issue on Distributed ( https://github.com/dask/distributed/issues )?

@jni
Copy link
Contributor

jni commented Mar 10, 2021

Wow, this is the kind of sleuthing that one loves to wake up to! 😂 👏 👏 Awesome work @m-albert!

I should have remembered and mentioned one more probably-relevant detail, which is that napari turns off dask task fusion when slicing dask arrays. That might account for why it was so hard to reproduce without napari. Sorry, I only just remembered that!

@jakirkham
Copy link
Member

napari turns off dask task fusion when slicing dask arrays

Yep that was the missing piece 😄

Here's another repro ( dask/distributed#4574 (comment) ). Fusion on things work. Fusion off fails!

@m-albert
Copy link
Collaborator

I should have remembered and mentioned one more probably-relevant detail, which is that napari turns off dask task fusion when slicing dask arrays. That might account for why it was so hard to reproduce without napari. Sorry, I only just remembered that!

Nice, so something goes wrong when trying to pack unfused graphs. For some reason, choosing a mapping function def func(x, block_info=None) instead of def func(block_info=None) circumvents this issue in the latest commit of dask_image. Looking forward to see whether dask/dask#7353 will solve the issue!

@jakirkham
Copy link
Member

Does PR ( dask/dask#7353 ) solve the issue? We just merged it fwiw

@jrbourbeau
Copy link
Member

I tested the changes in dask/dask#7353 against the example snippet posted in dask/distributed#4574 and the issue was resolved (a corresponding test was also added). It'd be great if someone could double check with the original napari workflow

@m-albert
Copy link
Collaborator

@jakirkham @jrbourbeau
I can confirm that dask/dask#7353 fixes all problems discussed in this issue and in particular the original dask-image-napari workflow 🎉

@GenevieveBuckley
Copy link
Collaborator Author

Really nice work here @m-albert

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants