Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rechunker 0.3.3 is incompatible with Dask 2022.01.1 and later #110

Closed
tomwhite opened this issue Mar 3, 2022 · 7 comments · Fixed by #112
Closed

Rechunker 0.3.3 is incompatible with Dask 2022.01.1 and later #110

tomwhite opened this issue Mar 3, 2022 · 7 comments · Fixed by #112

Comments

@tomwhite
Copy link
Collaborator

tomwhite commented Mar 3, 2022

Running the rechunker 0.3.3 unit tests with Dask 2022.2.0 gives lots of failures like the following.

It seems to have been introduced with this HLG change in Dask, released in Dask 2022.01.1.

This is a problem since we are stuck on rechunker 0.3.3 due to #92 (see also https://github.com/pystatgen/sgkit/issues/820).

________________________________________________ test_rechunk_dataset[mapper.temp.zarr-mapper.target.zarr-dask-10MB-target_chunks1-source_chunks0-shape0] ________________________________________________

tmp_path = PosixPath('/private/var/folders/cj/wyp4zgw17vj4m9qdmddvmcc80000gn/T/pytest-of-tom/pytest-1730/test_rechunk_dataset_mapper_te3'), shape = (100, 50), source_chunks = (10, 50)
target_chunks = {'a': {'x': 20, 'y': 10}, 'b': {'x': 20}}, max_mem = '10MB', executor = 'dask', target_store = <fsspec.mapping.FSMap object at 0x7fb48ae17890>
temp_store = <fsspec.mapping.FSMap object at 0x7fb48ae17d50>

    @pytest.mark.parametrize("shape", [(100, 50)])
    @pytest.mark.parametrize("source_chunks", [(10, 50)])
    @pytest.mark.parametrize(
        "target_chunks",
        [{"a": (20, 10), "b": (20,)}, {"a": {"x": 20, "y": 10}, "b": {"x": 20}}],
    )
    @pytest.mark.parametrize("max_mem", ["10MB"])
    @pytest.mark.parametrize("executor", ["dask"])
    @pytest.mark.parametrize("target_store", ["target.zarr", "mapper.target.zarr"])
    @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"])
    def test_rechunk_dataset(
        tmp_path,
        shape,
        source_chunks,
        target_chunks,
        max_mem,
        executor,
        target_store,
        temp_store,
    ):
        if target_store.startswith("mapper"):
            target_store = fsspec.get_mapper(str(tmp_path) + target_store)
            temp_store = fsspec.get_mapper(str(tmp_path) + temp_store)
        else:
            target_store = str(tmp_path / target_store)
            temp_store = str(tmp_path / temp_store)
    
        a = numpy.arange(numpy.prod(shape)).reshape(shape).astype("f4")
        a[-1] = numpy.nan
        ds = xarray.Dataset(
            dict(
                a=xarray.DataArray(
                    a, dims=["x", "y"], attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"}
                ),
                b=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                c=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            coords=dict(
                cx=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                cy=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"},
        )
        ds = ds.chunk(chunks=dict(zip(["x", "y"], source_chunks)))
        options = dict(
            a=dict(
                compressor=zarr.Blosc(cname="zstd"),
                dtype="int32",
                scale_factor=0.1,
                _FillValue=-9999,
            )
        )
        rechunked = api.rechunk(
            ds,
            target_chunks=target_chunks,
            max_mem=max_mem,
            target_store=target_store,
            target_options=options,
            temp_store=temp_store,
>           executor=executor,
        )

tests/test_rechunk.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
rechunker/api.py:305: in rechunk
    plan = executor.prepare_plan(copy_spec)
rechunker/executors/dask.py:21: in prepare_plan
    return _copy_all(specs)
rechunker/executors/dask.py:96: in _copy_all
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
rechunker/executors/dask.py:96: in <listcomp>
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

spec = CopySpec(read=ArrayProxy(array=dask.array<astype, shape=(100, 50), dtype=int32, chunksize=(10, 50), chunktype=numpy.nd...' (100, 50) int32>, chunks=(10, 50)), write=ArrayProxy(array=<zarr.core.Array '/a' (100, 50) int32>, chunks=(100, 50)))

    def _chunked_array_copy(spec: CopySpec) -> Delayed:
        """Chunked copy between arrays."""
        if spec.intermediate.array is None:
            target_store_delayed = _direct_array_copy(
                spec.read.array, spec.write.array, spec.read.chunks,
            )
    
            # fuse
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
            dsk_fused, _ = fuse(target_dsk)
    
            return Delayed(target_store_delayed.key, dsk_fused)
    
        else:
            # do intermediate store
            int_store_delayed = _direct_array_copy(
                spec.read.array, spec.intermediate.array, spec.read.chunks,
            )
            target_store_delayed = _direct_array_copy(
                spec.intermediate.array, spec.write.array, spec.write.chunks,
            )
    
            # now do some hacking to chain these together into a single graph.
            # get the two graphs as dicts
            int_dsk = dask.utils.ensure_dict(int_store_delayed.dask)
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
    
            # find the root store key representing the read
            root_keys = []
            for key in target_dsk:
                if isinstance(key, str):
                    if key.startswith("from-zarr"):
                        root_keys.append(key)
>           assert len(root_keys) == 1
E           AssertionError

rechunker/executors/dask.py:74: AssertionError
@rabernat
Copy link
Member

rabernat commented Mar 3, 2022

Thanks for reporting Tom. This feels vaguely similar to pangeo-forge/pangeo-forge-recipes#259, which we successfully fixed.

In pangeo-forge-recipes, we have copied (and improved upon) the executor framework we are using in rechunker. So fixing this will likely involve bringing rechunker up to speed with those changes.

@paigem
Copy link

paigem commented Mar 16, 2022

Chiming in to say that I am getting this same AssertionError using Dask 2022.2.1, but did not get the error using Dask 2022.1.1. I was running Rechunker version 0.3.3 just fine on Pangeo Cloud before the recent docker image update switched from Dask 2022.1.1 to 2022.2.1.

@rabernat
Copy link
Member

Hopefully fixed by #112.

@hammer
Copy link

hammer commented Mar 31, 2022

@tomwhite is going to track down some example code from @eric-czech to ensure this fix works for us.

@paigem
Copy link

paigem commented Apr 1, 2022

PR #112 seems to have fixed my issues! Thanks a bunch @rabernat!

@rabernat
Copy link
Member

rabernat commented Apr 1, 2022

Thanks @hammer! I've heard from a few folks already, and I'm feeling pretty confident that this issue is fixed. But would love as much feedback as possible. I will probably make a release early next week.

@rabernat
Copy link
Member

0.5.0 has been released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants