From 1b4c2b1a329bad863c3768b02341f80154e16a72 Mon Sep 17 00:00:00 2001 From: Fabien Maussion Date: Tue, 3 May 2022 18:41:22 +0200 Subject: [PATCH 1/6] Dask doc changes --- doc/user-guide/dask.rst | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 5110a970390..73edf72c70e 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -84,7 +84,7 @@ argument to :py:func:`~xarray.open_dataset` or using the In this example ``latitude`` and ``longitude`` do not appear in the ``chunks`` dict, so only one chunk will be used along those dimensions. It is also -entirely equivalent to opening a dataset using :py:meth:`~xarray.open_dataset` +entirely equivalent to opening a dataset using :py:func:`~xarray.open_dataset` and then chunking the data using the ``chunk`` method, e.g., ``xr.open_dataset('example-data.nc').chunk({'time': 10})``. @@ -95,13 +95,21 @@ use :py:func:`~xarray.open_mfdataset`:: This function will automatically concatenate and merge datasets into one in the simple cases that it understands (see :py:func:`~xarray.combine_by_coords` -for the full disclaimer). By default, :py:meth:`~xarray.open_mfdataset` will chunk each +for the full disclaimer). By default, :py:func:`~xarray.open_mfdataset` will chunk each netCDF file into a single Dask array; again, supply the ``chunks`` argument to control the size of the resulting Dask arrays. In more complex cases, you can -open each file individually using :py:meth:`~xarray.open_dataset` and merge the result, as -described in :ref:`combining data`. Passing the keyword argument ``parallel=True`` to :py:meth:`~xarray.open_mfdataset` will speed up the reading of large multi-file datasets by +open each file individually using :py:func:`~xarray.open_dataset` and merge the result, as +described in :ref:`combining data`. Passing the keyword argument ``parallel=True`` to +:py:func:`~xarray.open_mfdataset` will speed up the reading of large multi-file datasets by executing those read tasks in parallel using ``dask.delayed``. +.. warning:: + + :py:func:`~xarray.open_mfdataset` called without ``chunks`` argument will return + dask arrays with chunk sizes equal to the individual files. Re-chunking + the dataset after creation with ``ds.chunk()`` will lead to an ineffective use of + memory. + You'll notice that printing a dataset still shows a preview of array values, even if they are actually Dask arrays. We can do this quickly with Dask because we only need to compute the first few values (typically from the first block). @@ -236,6 +244,11 @@ sizes of Dask arrays is done with the :py:meth:`~xarray.Dataset.chunk` method: rechunked = ds.chunk({"latitude": 100, "longitude": 100}) +.. warning:: + + Rechunking an existing dask array created with :py:func:`~xarray.open_mfdataset` + is not recommended (see above). + You can view the size of existing chunks on an array by viewing the :py:attr:`~xarray.Dataset.chunks` attribute: @@ -542,15 +555,17 @@ larger chunksizes. Optimization Tips ----------------- -With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow in certain cases. Here are some optimization tips we have found through experience: +With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow or memory hungry in certain cases. Here are some optimization tips we have found through experience: + +1. Chunk as early as possible, and avoid rechunking as much as possible. Pass the ``chunks={}`` argument to :py:func:`~xarray.open_dataset` and :py:func:`~xarray.open_mfdataset` to avoid redundant file read. -1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). +2. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). -2. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) +3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) -3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load chunks of data referring to different chunks (probably not necessary if you follow suggestion 1). +4. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load chunks of data referring to different chunks. -4. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` +5. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. 5. Some dask-specific tips may be found `here `_. From 5e503283fc36365f72cad9ab4b6abea9d0dc0786 Mon Sep 17 00:00:00 2001 From: Fabien Maussion Date: Tue, 3 May 2022 18:44:12 +0200 Subject: [PATCH 2/6] small change --- doc/user-guide/dask.rst | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 73edf72c70e..b52df1fe977 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -557,18 +557,20 @@ Optimization Tips With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow or memory hungry in certain cases. Here are some optimization tips we have found through experience: -1. Chunk as early as possible, and avoid rechunking as much as possible. Pass the ``chunks={}`` argument to :py:func:`~xarray.open_dataset` and :py:func:`~xarray.open_mfdataset` to avoid redundant file read. +1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). -2. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). +2. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) -3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) +3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load chunks of data referring to different chunks. -4. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load chunks of data referring to different chunks. +4. Chunk as early as possible, and avoid rechunking as much as possible. Always + pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid + redundant file reads. 5. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. -5. Some dask-specific tips may be found `here `_. +6. Some dask-specific tips may be found `here `_. -6. The dask `diagnostics `_ can be +7. The dask `diagnostics `_ can be useful in identifying performance bottlenecks. From 0ea4f27e6187fc006bf83558bc3f3be58990ac60 Mon Sep 17 00:00:00 2001 From: Fabien Maussion Date: Sun, 8 May 2022 16:19:17 +0200 Subject: [PATCH 3/6] More edits --- doc/user-guide/dask.rst | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index b52df1fe977..5d155aee22e 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -108,7 +108,7 @@ executing those read tasks in parallel using ``dask.delayed``. :py:func:`~xarray.open_mfdataset` called without ``chunks`` argument will return dask arrays with chunk sizes equal to the individual files. Re-chunking the dataset after creation with ``ds.chunk()`` will lead to an ineffective use of - memory. + memory and is not recommended. You'll notice that printing a dataset still shows a preview of array values, even if they are actually Dask arrays. We can do this quickly with Dask because @@ -232,6 +232,7 @@ disk. available memory. .. note:: + For more on the differences between :py:meth:`~xarray.Dataset.persist` and :py:meth:`~xarray.Dataset.compute` see this `Stack Overflow answer `_ and the `Dask documentation `_. @@ -308,8 +309,7 @@ each block of your xarray object, you have three options: ``apply_ufunc`` ~~~~~~~~~~~~~~~ -Another option is to use xarray's :py:func:`~xarray.apply_ufunc`, which can -automate `embarrassingly parallel +:py:func:`~xarray.apply_ufunc` automates `embarrassingly parallel `__ "map" type operations where a function written for processing NumPy arrays should be repeatedly applied to xarray objects containing Dask arrays. It works similarly to @@ -557,11 +557,14 @@ Optimization Tips With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow or memory hungry in certain cases. Here are some optimization tips we have found through experience: -1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). +1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). More generally, ``groupby()`` is a costly operation and does not (yet) perform well on datasets split across multiple files (see :pull:`5734` and linked discussions there). 2. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) -3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load chunks of data referring to different chunks. +3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` + (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, + because there's no risk you will load subsets of data which span multiple chunks. On individual + files, prefer to subset before chunking (suggestion 1). 4. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid From 6990cff4956eb701b469ce1605e8fc08a55de5b3 Mon Sep 17 00:00:00 2001 From: Maximilian Roos <5635139+max-sixty@users.noreply.github.com> Date: Sun, 8 May 2022 15:36:31 -0700 Subject: [PATCH 4/6] Update doc/user-guide/dask.rst --- doc/user-guide/dask.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 5d155aee22e..62425aaaa5a 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -562,9 +562,9 @@ With analysis pipelines involving both spatial subsetting and temporal resamplin 2. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) 3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` - (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, - because there's no risk you will load subsets of data which span multiple chunks. On individual - files, prefer to subset before chunking (suggestion 1). +(e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, +because there's no risk you will load subsets of data which span multiple chunks. On individual +files, prefer to subset before chunking (suggestion 1). 4. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid From 526f5ad9d48f665d848c7f1ef90b2890597fd3b6 Mon Sep 17 00:00:00 2001 From: Maximilian Roos <5635139+max-sixty@users.noreply.github.com> Date: Sun, 8 May 2022 15:36:55 -0700 Subject: [PATCH 5/6] Update doc/user-guide/dask.rst --- doc/user-guide/dask.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 62425aaaa5a..9673b631da5 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -567,8 +567,8 @@ because there's no risk you will load subsets of data which span multiple chunks files, prefer to subset before chunking (suggestion 1). 4. Chunk as early as possible, and avoid rechunking as much as possible. Always - pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid - redundant file reads. +pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid +redundant file reads. 5. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. From 5fca7b0149753b801ee2e5668e0aa791e7cd67c8 Mon Sep 17 00:00:00 2001 From: Fabien Maussion Date: Mon, 9 May 2022 09:55:00 +0200 Subject: [PATCH 6/6] Back to one liners --- doc/user-guide/dask.rst | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 9673b631da5..56717f5306e 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -555,25 +555,20 @@ larger chunksizes. Optimization Tips ----------------- -With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow or memory hungry in certain cases. Here are some optimization tips we have found through experience: +With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance +can become very slow or memory hungry in certain cases. Here are some optimization tips we have found +through experience: 1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). More generally, ``groupby()`` is a costly operation and does not (yet) perform well on datasets split across multiple files (see :pull:`5734` and linked discussions there). 2. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) -3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` -(e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, -because there's no risk you will load subsets of data which span multiple chunks. On individual -files, prefer to subset before chunking (suggestion 1). +3. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load subsets of data which span multiple chunks. On individual files, prefer to subset before chunking (suggestion 1). -4. Chunk as early as possible, and avoid rechunking as much as possible. Always -pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid -redundant file reads. +4. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid redundant file reads. -5. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` - can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. +5. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. 6. Some dask-specific tips may be found `here `_. -7. The dask `diagnostics `_ can be - useful in identifying performance bottlenecks. +7. The dask `diagnostics `_ can be useful in identifying performance bottlenecks.