Skip to content

Commit

Permalink
Some edits
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Nov 9, 2024
1 parent b690248 commit a7c03e5
Showing 1 changed file with 27 additions and 29 deletions.
56 changes: 27 additions & 29 deletions doc/user-guide/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Here are some examples for using Xarray with Dask at scale:
- `CMIP6 Precipitation Frequency Analysis <https://gallery.pangeo.io/repos/pangeo-gallery/cmip6/precip_frequency_change.html>`_
- `Using Dask + Cloud Optimized GeoTIFFs <https://gallery.pangeo.io/repos/pangeo-data/landsat-8-tutorial-gallery/landsat8.html#Dask-Chunks-and-Cloud-Optimized-Geotiffs>`_

Find more examples at the `Project Pythia cookbook gallery <https://cookbooks.projectpythia.org/>`_.


Using Dask with Xarray
----------------------
Expand All @@ -38,11 +40,11 @@ Using Dask with Xarray
:align: right
:alt: A Dask array

Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks <https://docs.dask.org/en/stable/array-chunks.html?utm_source=xarray-docs>`_).
Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks <https://docs.dask.org/en/stable/array-chunks.html?utm_source=xarray-docs>`_). Commonly chunks are set when reading data, but you can also set the chunksize manually at any point in your workflow using :py:meth:`Dataset.chunk` and :py:meth:`DataArray.chunk`. See :ref:`dask.chunks` for more.

Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph.

When a result is requested (e.g., for plotting, saving, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently.
When a result is requested (e.g., for plotting, writing to disk, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently.

Nearly all Xarray methods have been extended to work automatically with Dask Arrays. This includes things like indexing, concatenating, rechunking, grouped operations, etc. Common operations are covered in more detail in each of the sections below.

Expand All @@ -51,7 +53,7 @@ Nearly all Xarray methods have been extended to work automatically with Dask Arr
Reading and writing data
~~~~~~~~~~~~~~~~~~~~~~~~

When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument.
When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument. Specifying ``chunks="auto"`` will set the dask chunk sizes to be a multiple of the on-disk chunk sizes. This can be a good idea, but usually the appropriate dask chunk size will depend on your workflow.

.. tab:: Zarr

Expand All @@ -75,7 +77,7 @@ When reading data, Dask divides your dataset into smaller chunks. You can specif

.. tip::

When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine=h5netcdf`` can
When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine="h5netcdf"`` can
be faster than the default which uses the netCDF4 package.

Save larger-than-memory netCDF files::
Expand Down Expand Up @@ -144,17 +146,17 @@ There are a few common cases where you may want to convert lazy Dask arrays into
- You've reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory
- You need to compute intermediate results since Dask is unable (or struggles) to perform a certain computation. The canonical example of this is normalizing a dataset, e.g., ``ds - ds.mean()``, when ``ds`` is larger than memory. Typically, you should either save ``ds`` to disk or compute ``ds.mean()`` eagerly.

To do this, you can use :py:meth:`~xarray.Dataset.compute`:
To do this, you can use :py:meth:`Dataset.compute` or :py:meth:`DataArray.compute`:

.. ipython:: python
ds.compute()
.. note::

Using :py:meth:`~xarray.Dataset.compute` is preferred to :py:meth:`~xarray.Dataset.load`, which changes the results in-place.
Using :py:meth:`Dataset.compute` is preferred to :py:meth:`Dataset.load`, which changes the results in-place.

You can also access :py:attr:`~xarray.DataArray.values`, which will always be a NumPy array:
You can also access :py:attr:`DataArray.values`, which will always be a NumPy array:

.. ipython::
:verbatim:
Expand All @@ -166,7 +168,7 @@ You can also access :py:attr:`~xarray.DataArray.values`, which will always be a
...
# truncated for brevity

NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those
NumPy ufuncs like :py:func:`numpy.sin` transparently work on all xarray objects, including those
that store lazy Dask arrays:

.. ipython:: python
Expand All @@ -175,22 +177,18 @@ that store lazy Dask arrays:
np.sin(ds)
To access Dask arrays directly, use the
:py:attr:`DataArray.data <xarray.DataArray.data>` attribute. This attribute exposes
array data either as a Dask array or as a NumPy array, depending on whether it has been
loaded into Dask or not.

.. note::

``.data`` is also used to expose other "computable" array backends beyond Dask and
NumPy (e.g. sparse and pint arrays).
To access Dask arrays directly, use the :py:attr:`DataArray.data` attribute which exposes the DataArray's underlying array type.

If you're using a Dask cluster, you can also use :py:meth:`~xarray.Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data.
If you're using a Dask cluster, you can also use :py:meth:`Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data.

.. code-block:: python
ds = ds.persist()
.. tip::

Remember to save the dataset returned by persist! This is a common mistake.

.. _dask.chunks:

Chunking and performance
Expand All @@ -204,9 +202,10 @@ The way a dataset is chunked can be critical to performance when working with la

It can be helpful to choose chunk sizes based on your downstream analyses and to chunk as early as possible. Datasets with smaller chunks along the time axis, for example, can make time domain problems easier to parallelize since Dask can perform the same operation on each time chunk. If you're working with a large dataset with chunks that make downstream analyses challenging, you may need to rechunk your data. This is an expensive operation though, so is only recommended when needed.

You can rechunk a dataset by:
You can chunk or rechunk a dataset by:

- Specifying ``chunks={}`` when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth.
- Specifying the ``chunks`` kwarg when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth.
- Many array file formats are chunked on disk. You can specify ``chunks={}`` to have a single dask chunk map to a single on-disk chunk, and ``chunks="auto"`` to have a single dask chunk be a automatically chosen multiple of the on-disk chunks.
- Using :py:meth:`Dataset.chunk` after you've already read in your dataset. For time domain problems, for example, you can use ``ds.chunk(time=TimeResampler())`` to rechunk according to a specified unit of time. ``ds.chunk(time=TimeResampler("MS"))``, for example, will set the chunks so that a month of data is contained in one chunk.


Expand All @@ -224,7 +223,11 @@ each block of your xarray object, you have three options:
1. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
2. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
to apply functions that consume and return xarray objects.
3. Extract Dask Arrays from xarray objects with ``.data`` and use Dask directly.
3. Extract Dask Arrays from xarray objects with :py:attr:`DataArray.data` and use Dask directly.

.. tip::

See the extensive Xarray tutorial on `apply_ufunc <https://tutorial.xarray.dev/advanced/apply_ufunc/apply_ufunc.html>`_.


``apply_ufunc``
Expand Down Expand Up @@ -475,7 +478,6 @@ Here's an example of a simplified workflow putting some of these tips together:

.. code-block:: python
from flox.xarray import xarray_reduce
import xarray
ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y
Expand All @@ -486,11 +488,7 @@ Here's an example of a simplified workflow putting some of these tips together:
time=slice("2020-01-01", "2020-12-31") # Filter early
)
zonal_mean = xarray_reduce( # Faster groupby with flox
time_subset,
chunked_zones,
func="mean",
expected_groups=(zone_labels,),
)
# faster resampling when flox is installed
daily = ds.resample(time="D").mean()
zonal_mean.load() # Pull smaller results into memory after reducing the dataset
daily.load() # Pull smaller results into memory after reducing the dataset

0 comments on commit a7c03e5

Please sign in to comment.