Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Zarr chunks refactor #4550

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
f961606
add in api.open_dataset dispatching to stub apiv2
aurghs Sep 25, 2020
fb166fa
remove in apiv2 check for input AbstractDataStore
aurghs Sep 25, 2020
0221eec
bugfix typo
aurghs Sep 25, 2020
36a02c7
add kwarg engines in _get_backend_cls needed by apiv2
aurghs Sep 25, 2020
cfb8cb8
add alpha support for h5netcdf
aurghs Sep 25, 2020
4256bc8
style: clean not used code, modify some variable/function name
aurghs Sep 28, 2020
1bc7391
Add ENGINES entry for cfgrib.
Sep 28, 2020
748fe5a
Define function open_backend_dataset_cfgrib() to be used in apiv2.py.
Sep 28, 2020
fb368fe
Apply black to check formatting.
Sep 28, 2020
80e111c
Apply black to check formatting.
Sep 28, 2020
e15ca6b
add dummy zarr apiv2 backend
aurghs Sep 28, 2020
025cc87
Merge branch 'master' into backend-read-refactor
aurghs Sep 28, 2020
4b19399
align apiv2.open_dataset to api.open_dataset
aurghs Sep 28, 2020
572595f
remove unused extra_coords in open_backend_dataset_*
aurghs Sep 29, 2020
d6e632e
Merge remote-tracking branch 'origin/cfgrib_refactor' into backend-re…
aurghs Sep 29, 2020
74aba14
remove extra_coords in open_backend_dataset_cfgrib
aurghs Sep 29, 2020
d6280ec
transform zarr maybe_chunk and get_chunks in classmethod
aurghs Sep 29, 2020
c0e0f34
make alpha zarr apiv2 working
aurghs Sep 29, 2020
6431101
refactor apiv2.open_dataset:
aurghs Sep 29, 2020
50d1ebe
move dataset_from_backend_dataset out of apiv2.open_dataset
aurghs Sep 30, 2020
383d323
remove blank lines
aurghs Sep 30, 2020
457a09c
remove blank lines
aurghs Sep 30, 2020
2803fe3
style
aurghs Sep 30, 2020
08db0bd
Re-write error messages
alexamici Sep 30, 2020
1f11845
Fix code style
alexamici Sep 30, 2020
93303b1
Fix code style
alexamici Sep 30, 2020
bc2fe00
remove unused import
aurghs Sep 30, 2020
102b00a
zarr chunking refactor draft not working
aurghs Oct 1, 2020
f47605a
refactor dataset_from_backend_dataset
aurghs Oct 2, 2020
b632b05
fix wrong commit
aurghs Oct 2, 2020
b437f02
add get_chunk in apiv2
aurghs Oct 2, 2020
d694146
replace warning with ValueError for not supported kwargs in backends
aurghs Oct 8, 2020
56f4d3f
change zarr.ZarStore.get_chunks into a static method
aurghs Oct 8, 2020
df23b18
group `backend_kwargs` and `kwargs` in `extra_tokes` argument in apiv…
aurghs Oct 8, 2020
a04e6ac
remove in open_backend_dayaset_${engine} signature kwarags and the re…
aurghs Oct 8, 2020
de29a4c
black
aurghs Oct 8, 2020
3b896f2
Merge branch 'backend-read-refactor' into zarr-chunks-refactor
aurghs Oct 8, 2020
cf77dc3
remove not used apiv2.set_source
aurghs Oct 8, 2020
c1b763a
remove `auto` as chunk value
aurghs Oct 9, 2020
c32c62c
Merge branch 'master' into zarr-chunks-refactor
aurghs Oct 29, 2020
5eb0daf
- align with api.py
aurghs Oct 29, 2020
4fc1d8d
unify backends chunking
aurghs Oct 29, 2020
1cf6968
move get_chunk funtion in dataset
aurghs Oct 29, 2020
a780efc
move get_chunk funtion in dataset
aurghs Oct 29, 2020
18e0077
Merge branch 'zarr-chunks-refactor' of github.com:bopen/xarray into z…
aurghs Oct 29, 2020
93cadee
black
aurghs Oct 29, 2020
6e9a562
remove unused import
aurghs Oct 29, 2020
69c2790
remove unused import
aurghs Oct 29, 2020
0616a08
Merge branch 'zarr-chunks-refactor' of github.com:bopen/xarray into z…
aurghs Oct 29, 2020
a1cfa29
Pass isort test.
Nov 2, 2020
c590e05
Pass black -t py36 test.
Nov 2, 2020
c6d341c
Remove duplicated functions.
Nov 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
combine_by_coords,
)
from ..core.dataarray import DataArray
from ..core.dataset import Dataset, _maybe_chunk
from ..core.dataset import Dataset, _maybe_chunk, _get_chunk
from ..core.utils import close_on_error, is_grib_path, is_remote_uri
from .common import AbstractDataStore, ArrayWriter
from .locks import _get_scheduler
Expand Down Expand Up @@ -535,7 +535,7 @@ def maybe_decode_store(store, chunks):
k: _maybe_chunk(
k,
v,
store.get_chunk(k, v, chunks),
_get_chunk(k, v, chunks),
overwrite_encoded_chunks=overwrite_encoded_chunks,
)
for k, v in ds.variables.items()
Expand Down
77 changes: 39 additions & 38 deletions xarray/backends/apiv2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

from ..core.dataset import _get_chunk, _maybe_chunk
from ..core.utils import is_remote_uri
from . import cfgrib_, h5netcdf_, zarr
from .api import (
Expand All @@ -16,66 +17,58 @@
}


def get_mtime(filename_or_obj):
# if passed an actual file path, augment the token with
# the file modification time
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj):
mtime = os.path.getmtime(filename_or_obj)
else:
mtime = None
return mtime


def dataset_from_backend_dataset(
ds,
backend_ds,
filename_or_obj,
engine,
chunks,
cache,
overwrite_encoded_chunks,
extra_tokens,
):
if not (isinstance(chunks, (int, dict)) or chunks is None):
if chunks != "auto":
raise ValueError(
"chunks must be an int, dict, 'auto', or None. "
"Instead found %s. " % chunks
)

_protect_dataset_variables_inplace(ds, cache)
if chunks is not None and engine != "zarr":
_protect_dataset_variables_inplace(backend_ds, cache)
if chunks is None:
ds = backend_ds
else:
from dask.base import tokenize

# if passed an actual file path, augment the token with
# the file modification time
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj):
mtime = os.path.getmtime(filename_or_obj)
else:
mtime = None
mtime = get_mtime(filename_or_obj)
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
name_prefix = "open_dataset-%s" % token
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token)

elif engine == "zarr":

if chunks == "auto":
try:
import dask.array # noqa
except ImportError:
chunks = None

if chunks is None:
return ds

if isinstance(chunks, int):
chunks = dict.fromkeys(ds.dims, chunks)
chunks = dict.fromkeys(backend_ds.dims, chunks)

variables = {
k: zarr.ZarrStore.maybe_chunk(k, v, chunks, overwrite_encoded_chunks)
for k, v in ds.variables.items()
name: _maybe_chunk(
name,
var,
_get_chunk(name, var, chunks),
overwrite_encoded_chunks=overwrite_encoded_chunks,
name_prefix=name_prefix,
token=token,
)
for name, var in backend_ds.variables.items()
}
ds2 = ds._replace(variables)
ds = backend_ds._replace(variables)

else:
ds2 = ds
ds2._file_obj = ds._file_obj
ds._file_obj = backend_ds._file_obj

# Ensure source filename always stored in dataset object (GH issue #2550)
if "source" not in ds.encoding:
if isinstance(filename_or_obj, str):
ds.encoding["source"] = filename_or_obj
backend_ds.encoding["source"] = filename_or_obj

return ds2
return ds


def open_dataset(
Expand Down Expand Up @@ -191,6 +184,14 @@ def open_dataset(
open_mfdataset
"""

if chunks == "auto":
chunks = {}
if not (isinstance(chunks, (int, dict)) or chunks is None):
raise ValueError(
"chunks must be an int, dict, 'auto', or None. "
"Instead found %s. " % chunks
)

if cache is None:
cache = chunks is None

Expand Down
47 changes: 0 additions & 47 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,53 +362,6 @@ def encode_variable(self, variable):
def encode_attribute(self, a):
return encode_zarr_attr_value(a)

@staticmethod
def get_chunk(name, var, chunks):
chunk_spec = dict(zip(var.dims, var.encoding.get("chunks")))

# Coordinate labels aren't chunked
if var.ndim == 1 and var.dims[0] == name:
return chunk_spec

if chunks == "auto":
return chunk_spec

for dim in var.dims:
if dim in chunks:
spec = chunks[dim]
if isinstance(spec, int):
spec = (spec,)
if isinstance(spec, (tuple, list)) and chunk_spec[dim]:
if any(s % chunk_spec[dim] for s in spec):
warnings.warn(
"Specified Dask chunks %r would "
"separate Zarr chunk shape %r for "
"dimension %r. This significantly "
"degrades performance. Consider "
"rechunking after loading instead."
% (chunks[dim], chunk_spec[dim], dim),
stacklevel=2,
)
chunk_spec[dim] = chunks[dim]
return chunk_spec

@classmethod
def maybe_chunk(cls, name, var, chunks, overwrite_encoded_chunks):
chunk_spec = cls.get_chunk(name, var, chunks)

if (var.ndim > 0) and (chunk_spec is not None):
from dask.base import tokenize

# does this cause any data to be read?
token2 = tokenize(name, var._data, chunks)
name2 = f"xarray-{name}-{token2}"
var = var.chunk(chunk_spec, name=name2, lock=None)
if overwrite_encoded_chunks and var.chunks is not None:
var.encoding["chunks"] = tuple(x[0] for x in var.chunks)
return var
else:
return var

def store(
self,
variables,
Expand Down
34 changes: 34 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,40 @@ def _assert_empty(args: tuple, msg: str = "%s") -> None:
raise ValueError(msg % args)


def _check_chunks_compatibility(dim, chunks, chunk_spec):
spec = chunks[dim]
if isinstance(spec, int):
spec = (spec,)
if any(s % chunk_spec[dim] for s in spec):
warnings.warn(
"Specified Dask chunks %r would "
"separate on disks chunk shape %r for "
"dimension %r. This could "
"degrades performance. Consider "
"rechunking after loading instead." % (chunks[dim], chunk_spec[dim], dim),
stacklevel=2,
)


def _get_chunk(name, var, chunks):
if chunks == "auto":
chunks = {}

preferred_chunks = dict(zip(var.dims, var.encoding.get("chunks", {})))
if var.ndim == 1 and var.dims[0] == name:
return preferred_chunks

output_chunks = {}
if chunks is not None:
for dim in preferred_chunks:
if dim in chunks:
_check_chunks_compatibility(dim, chunks, preferred_chunks)
output_chunks[dim] = chunks[dim]
else:
output_chunks[dim] = preferred_chunks[dim]
return output_chunks


def _maybe_chunk(
name,
var,
Expand Down