Skip to content

Commit 1c1e6f1

Browse files
bouweandelavaleriupredoiRémi Kazeroni
authored
Add support for configuring Dask distributed (#2049)
Co-authored-by: Valeriu Predoi <valeriu.predoi@gmail.com> Co-authored-by: Rémi Kazeroni <remi.kazeroni@dlr.de>
1 parent f656483 commit 1c1e6f1

File tree

12 files changed

+459
-19
lines changed

12 files changed

+459
-19
lines changed

doc/conf.py

+2
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,8 @@
423423
(f'https://docs.esmvaltool.org/projects/ESMValCore/en/{rtd_version}/',
424424
None),
425425
'esmvaltool': (f'https://docs.esmvaltool.org/en/{rtd_version}/', None),
426+
'dask': ('https://docs.dask.org/en/stable/', None),
427+
'distributed': ('https://distributed.dask.org/en/stable/', None),
426428
'iris': ('https://scitools-iris.readthedocs.io/en/latest/', None),
427429
'iris-esmf-regrid': ('https://iris-esmf-regrid.readthedocs.io/en/latest',
428430
None),

doc/quickstart/configure.rst

+155
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,161 @@ the user.
199199
debugging, etc. You can even provide any config user value as a run flag
200200
``--argument_name argument_value``
201201

202+
.. _config-dask:
203+
204+
Dask distributed configuration
205+
==============================
206+
207+
The :ref:`preprocessor functions <preprocessor_functions>` and many of the
208+
:ref:`Python diagnostics in ESMValTool <esmvaltool:recipes>` make use of the
209+
:ref:`Iris <iris:iris_docs>` library to work with the data.
210+
In Iris, data can be either :ref:`real or lazy <iris:real_and_lazy_data>`.
211+
Lazy data is represented by `dask arrays <https://docs.dask.org/en/stable/array.html>`_.
212+
Dask arrays consist of many small
213+
`numpy arrays <https://numpy.org/doc/stable/user/absolute_beginners.html#what-is-an-array>`_
214+
(called chunks) and if possible, computations are run on those small arrays in
215+
parallel.
216+
In order to figure out what needs to be computed when, Dask makes use of a
217+
'`scheduler <https://docs.dask.org/en/stable/scheduling.html>`_'.
218+
The default scheduler in Dask is rather basic, so it can only run on a single
219+
computer and it may not always find the optimal task scheduling solution,
220+
resulting in excessive memory use when using e.g. the
221+
:func:`esmvalcore.preprocessor.multi_model_statistics` preprocessor function.
222+
Therefore it is recommended that you take a moment to configure the
223+
`Dask distributed <https://distributed.dask.org>`_ scheduler.
224+
A Dask scheduler and the 'workers' running the actual computations, are
225+
collectively called a 'Dask cluster'.
226+
227+
In ESMValCore, the Dask cluster can configured by creating a file called
228+
``~/.esmvaltool/dask.yml``, where ``~`` is short for your home directory.
229+
In this file, under the ``client`` keyword, the arguments to
230+
:obj:`distributed.Client` can be provided.
231+
Under the ``cluster`` keyword, the type of cluster (e.g.
232+
:obj:`distributed.LocalCluster`), as well as any arguments required to start
233+
the cluster can be provided.
234+
Extensive documentation on setting up Dask Clusters is available
235+
`here <https://docs.dask.org/en/latest/deploying.html>`__.
236+
237+
.. warning::
238+
239+
The format of the ``~/.esmvaltool/dask.yml`` configuration file is not yet
240+
fixed and may change in the next release of ESMValCore.
241+
242+
.. note::
243+
244+
If not all preprocessor functions support lazy data, computational
245+
performance may be best with the default scheduler.
246+
See `issue #674 <https://github.com/ESMValGroup/ESMValCore/issues/674>`_ for
247+
progress on making all preprocessor functions lazy.
248+
249+
**Example configurations**
250+
251+
*Personal computer*
252+
253+
Create a Dask distributed cluster on the computer running ESMValCore using
254+
all available resources:
255+
256+
.. code:: yaml
257+
258+
cluster:
259+
type: distributed.LocalCluster
260+
261+
this should work well for most personal computers.
262+
263+
.. note::
264+
265+
Note that, if running this configuration on a shared node of an HPC cluster,
266+
Dask will try and use as many resources it can find available, and this may
267+
lead to overcrowding the node by a single user (you)!
268+
269+
*Shared computer*
270+
271+
Create a Dask distributed cluster on the computer running ESMValCore, with
272+
2 workers with 4 threads/4 GiB of memory each (8 GiB in total):
273+
274+
.. code:: yaml
275+
276+
cluster:
277+
type: distributed.LocalCluster
278+
n_workers: 2
279+
threads_per_worker: 4
280+
memory_limit: 4 GiB
281+
282+
this should work well for shared computers.
283+
284+
*Computer cluster*
285+
286+
Create a Dask distributed cluster on the
287+
`Levante <https://docs.dkrz.de/doc/levante/running-jobs/index.html>`_
288+
supercomputer using the `Dask-Jobqueue <https://jobqueue.dask.org/en/latest/>`_
289+
package:
290+
291+
.. code:: yaml
292+
293+
cluster:
294+
type: dask_jobqueue.SLURMCluster
295+
queue: shared
296+
account: bk1088
297+
cores: 8
298+
memory: 7680MiB
299+
processes: 2
300+
interface: ib0
301+
local_directory: "/scratch/b/b381141/dask-tmp"
302+
n_workers: 24
303+
304+
This will start 24 workers with ``cores / processes = 4`` threads each,
305+
resulting in ``n_workers / processes = 12`` Slurm jobs, where each Slurm job
306+
will request 8 CPU cores and 7680 MiB of memory and start ``processes = 2``
307+
workers.
308+
This example will use the fast infiniband network connection (called ``ib0``
309+
on Levante) for communication between workers running on different nodes.
310+
It is
311+
`important to set the right location for temporary storage <https://docs.dask.org/en/latest/deploying-hpc.html#local-storage>`__,
312+
in this case the ``/scratch`` space is used.
313+
It is also possible to use environmental variables to configure the temporary
314+
storage location, if you cluster provides these.
315+
316+
A configuration like this should work well for larger computations where it is
317+
advantageous to use multiple nodes in a compute cluster.
318+
See
319+
`Deploying Dask Clusters on High Performance Computers <https://docs.dask.org/en/latest/deploying-hpc.html>`_
320+
for more information.
321+
322+
*Externally managed Dask cluster*
323+
324+
Use an externally managed cluster, e.g. a cluster that you started using the
325+
`Dask Jupyterlab extension <https://github.com/dask/dask-labextension#dask-jupyterlab-extension>`_:
326+
327+
.. code:: yaml
328+
329+
client:
330+
address: '127.0.0.1:8786'
331+
332+
See `here <https://jobqueue.dask.org/en/latest/interactive.html>`_
333+
for an example of how to configure this on a remote system.
334+
335+
For debugging purposes, it can be useful to start the cluster outside of
336+
ESMValCore because then
337+
`Dask dashboard <https://docs.dask.org/en/stable/dashboard.html>`_ remains
338+
available after ESMValCore has finished running.
339+
340+
**Advice on choosing performant configurations**
341+
342+
The threads within a single worker can access the same memory locations, so
343+
they may freely pass around chunks, while communicating a chunk between workers
344+
is done by copying it, so this is (a bit) slower.
345+
Therefore it is beneficial for performance to have multiple threads per worker.
346+
However, due to limitations in the CPython implementation (known as the Global
347+
Interpreter Lock or GIL), only a single thread in a worker can execute Python
348+
code (this limitation does not apply to compiled code called by Python code,
349+
e.g. numpy), therefore the best performing configurations will typically not
350+
use much more than 10 threads per worker.
351+
352+
Due to limitations of the NetCDF library (it is not thread-safe), only one
353+
of the threads in a worker can read or write to a NetCDF file at a time.
354+
Therefore, it may be beneficial to use fewer threads per worker if the
355+
computation is very simple and the runtime is determined by the
356+
speed with which the data can be read from and/or written to disk.
202357

203358
.. _config-esgf:
204359

environment.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ dependencies:
1010
- cftime
1111
- compilers
1212
- dask
13+
- dask-jobqueue
14+
- distributed
1315
- esgf-pyclient>=0.3.1
1416
- esmpy!=8.1.0
1517
- filelock
@@ -18,7 +20,7 @@ dependencies:
1820
- geopy
1921
- humanfriendly
2022
- importlib_resources
21-
- iris>=3.4.0
23+
- iris>=3.6.0
2224
- iris-esmf-regrid >=0.6.0 # to work with latest esmpy
2325
- isodate
2426
- jinja2

esmvalcore/_main.py

+3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def process_recipe(recipe_file: Path, session):
7474
import shutil
7575

7676
from esmvalcore._recipe.recipe import read_recipe_file
77+
from esmvalcore.config._dask import check_distributed_config
7778
if not recipe_file.is_file():
7879
import errno
7980
raise OSError(errno.ENOENT, "Specified recipe file does not exist",
@@ -103,6 +104,8 @@ def process_recipe(recipe_file: Path, session):
103104
logger.info("If you experience memory problems, try reducing "
104105
"'max_parallel_tasks' in your user configuration file.")
105106

107+
check_distributed_config()
108+
106109
if session['compress_netcdf']:
107110
logger.warning(
108111
"You have enabled NetCDF compression. Accessing .nc files can be "

esmvalcore/_task.py

+30-8
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import psutil
2121
import yaml
22+
from distributed import Client
2223

2324
from ._citation import _write_citation_files
2425
from ._provenance import TrackedFile, get_task_provenance
26+
from .config._dask import get_distributed_client
2527
from .config._diagnostics import DIAGNOSTICS, TAGS
2628

2729

@@ -718,10 +720,22 @@ def run(self, max_parallel_tasks: Optional[int] = None) -> None:
718720
max_parallel_tasks : int
719721
Number of processes to run. If `1`, run the tasks sequentially.
720722
"""
721-
if max_parallel_tasks == 1:
722-
self._run_sequential()
723-
else:
724-
self._run_parallel(max_parallel_tasks)
723+
with get_distributed_client() as client:
724+
if client is None:
725+
address = None
726+
else:
727+
address = client.scheduler.address
728+
for task in self.flatten():
729+
if (isinstance(task, DiagnosticTask)
730+
and Path(task.script).suffix.lower() == '.py'):
731+
# Only insert the scheduler address if running a
732+
# Python script.
733+
task.settings['scheduler_address'] = address
734+
735+
if max_parallel_tasks == 1:
736+
self._run_sequential()
737+
else:
738+
self._run_parallel(address, max_parallel_tasks)
725739

726740
def _run_sequential(self) -> None:
727741
"""Run tasks sequentially."""
@@ -732,7 +746,7 @@ def _run_sequential(self) -> None:
732746
for task in sorted(tasks, key=lambda t: t.priority):
733747
task.run()
734748

735-
def _run_parallel(self, max_parallel_tasks=None):
749+
def _run_parallel(self, scheduler_address, max_parallel_tasks):
736750
"""Run tasks in parallel."""
737751
scheduled = self.flatten()
738752
running = {}
@@ -757,7 +771,8 @@ def done(task):
757771
if len(running) >= max_parallel_tasks:
758772
break
759773
if all(done(t) for t in task.ancestors):
760-
future = pool.apply_async(_run_task, [task])
774+
future = pool.apply_async(_run_task,
775+
[task, scheduler_address])
761776
running[task] = future
762777
scheduled.remove(task)
763778

@@ -790,7 +805,14 @@ def _copy_results(task, future):
790805
task.output_files, task.products = future.get()
791806

792807

793-
def _run_task(task):
808+
def _run_task(task, scheduler_address):
794809
"""Run task and return the result."""
795-
output_files = task.run()
810+
if scheduler_address is None:
811+
client = contextlib.nullcontext()
812+
else:
813+
client = Client(scheduler_address)
814+
815+
with client:
816+
output_files = task.run()
817+
796818
return output_files, task.products

esmvalcore/config/_dask.py

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Configuration for Dask distributed."""
2+
import contextlib
3+
import importlib
4+
import logging
5+
from pathlib import Path
6+
7+
import yaml
8+
from distributed import Client
9+
10+
logger = logging.getLogger(__name__)
11+
12+
CONFIG_FILE = Path.home() / '.esmvaltool' / 'dask.yml'
13+
14+
15+
def check_distributed_config():
16+
"""Check the Dask distributed configuration."""
17+
if not CONFIG_FILE.exists():
18+
logger.warning(
19+
"Using the Dask basic scheduler. This may lead to slow "
20+
"computations and out-of-memory errors. "
21+
"Note that the basic scheduler may still be the best choice for "
22+
"preprocessor functions that are not lazy. "
23+
"In that case, you can safely ignore this warning. "
24+
"See https://docs.esmvaltool.org/projects/ESMValCore/en/latest/"
25+
"quickstart/configure.html#dask-distributed-configuration for "
26+
"more information. ")
27+
28+
29+
@contextlib.contextmanager
30+
def get_distributed_client():
31+
"""Get a Dask distributed client."""
32+
dask_args = {}
33+
if CONFIG_FILE.exists():
34+
config = yaml.safe_load(CONFIG_FILE.read_text(encoding='utf-8'))
35+
if config is not None:
36+
dask_args = config
37+
38+
client_args = dask_args.get('client') or {}
39+
cluster_args = dask_args.get('cluster') or {}
40+
41+
# Start a cluster, if requested
42+
if 'address' in client_args:
43+
# Use an externally managed cluster.
44+
cluster = None
45+
if cluster_args:
46+
logger.warning(
47+
"Not using Dask 'cluster' settings from %s because a cluster "
48+
"'address' is already provided in 'client'.", CONFIG_FILE)
49+
elif cluster_args:
50+
# Start cluster.
51+
cluster_type = cluster_args.pop(
52+
'type',
53+
'distributed.LocalCluster',
54+
)
55+
cluster_module_name, cluster_cls_name = cluster_type.rsplit('.', 1)
56+
cluster_module = importlib.import_module(cluster_module_name)
57+
cluster_cls = getattr(cluster_module, cluster_cls_name)
58+
cluster = cluster_cls(**cluster_args)
59+
client_args['address'] = cluster.scheduler_address
60+
else:
61+
# No cluster configured, use Dask basic scheduler, or a LocalCluster
62+
# managed through Client.
63+
cluster = None
64+
65+
# Start a client, if requested
66+
if dask_args:
67+
client = Client(**client_args)
68+
logger.info("Dask dashboard: %s", client.dashboard_link)
69+
else:
70+
logger.info("Using the Dask basic scheduler.")
71+
client = None
72+
73+
try:
74+
yield client
75+
finally:
76+
if client is not None:
77+
client.close()
78+
if cluster is not None:
79+
cluster.close()

esmvalcore/experimental/recipe.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import yaml
1111

1212
from esmvalcore._recipe.recipe import Recipe as RecipeEngine
13-
from esmvalcore.config import CFG, Session
13+
from esmvalcore.config import CFG, Session, _dask
1414

1515
from ._logging import log_to_dir
1616
from .recipe_info import RecipeInfo
@@ -132,6 +132,7 @@ def run(
132132
session['diagnostics'] = task
133133

134134
with log_to_dir(session.run_dir):
135+
_dask.check_distributed_config()
135136
self._engine = self._load(session=session)
136137
self._engine.run()
137138

0 commit comments

Comments
 (0)