Skip to content

Commit

Permalink
Merge pull request #160 from TomAugspurger/refactor
Browse files Browse the repository at this point in the history
Refactor XarrayZarrRecipe to be serialization-friendly
  • Loading branch information
rabernat authored Jun 25, 2021
2 parents 05f3a1f + f57e36a commit e1ef575
Show file tree
Hide file tree
Showing 11 changed files with 2,060 additions and 5,473 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ dmypy.json
# Pyre type checker
.pyre/
_version.py

# tutorials
*.nc
dask-worker-space
8 changes: 8 additions & 0 deletions docs/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release Notes

## v0.4.0 - 2021-06-25

- Fixed issue with recipe serialilzation causing high memory usage of Dask schedulers and workers when
executing recipes with Prefect or Dask (https://github.com/pangeo-forge/pangeo-forge-recipes/pull/160)[#160].
- Added new methods `.to_dask()`, `to_prefect()`, and `.to_function()` for converting a recipe
to one of the Dask, Prefect, or Python execution plans. The previous method, `recpie.to_pipelines()`
is now deprecated.

## v0.3.4 - 2021-05-25

- Added `copy_pruned` method to `XarrayZarrRecipe` to facilitate testing.
Expand Down
59 changes: 33 additions & 26 deletions docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ recipe.prepare_target()

For example, for Zarr targets, this sets up the Zarr group with the necessary
arrays and metadata.
This is the most complex step, and the most likely place to get an error.

### Stage 3: Store Chunks

Expand All @@ -57,43 +58,49 @@ If there is any cleanup or consolidation to be done, it happens here.
recipe.finalize_target()
```

For example, consolidating Zarr metadta happens in the finalize step.
For example, consolidating Zarr metadata happens in the finalize step.

## Execution by Executors
## Compiled Recipes

Very large recipes cannot feasibly be executed this way.
To support distributed parallel execution, Pangeo Forge borrows the
[Executors framework from Rechunker](https://rechunker.readthedocs.io/en/latest/executors.html).
Instead, recipes can be _compiled_ to executable objects.
We currently support three types of compilation.

There are currently three executors implemented.
- {class}`pangeo_forge_recipes.executors.PythonPipelineExecutor`: a reference executor
using simple python
- {class}`pangeo_forge_recipes.executors.DaskPipelineExecutor`: distributed executor using Dask
- {class}`pangeo_forge_recipes.executors.PrefectPipelineExecutor`: distributed executor using Prefect
### Python Function

To use an executor, the recipe must first be transformed into a `Pipeline` object.
The full process looks like this:
To convert a recipe to a single python function, use the method `.to_function()`.
For example

```{code-block} python
pipeline = recipe.to_pipelines()
executor = PrefectPipelineExecutor()
plan = executor.pipelines_to_plan(pipeline)
executor.execute_plan(plan) # actually runs the recipe
recipe_func = recipe.to_function()
recipe_func() # actually execute the recipe
```

## Executors
Note that the python function approach does not support parallel or distributed execution.
It's mostly just a convenience utility.

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.PythonPipelineExecutor
:members:
```

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.DaskPipelineExecutor
:members:
### Dask Delayed

You can convert your recipe to a [Dask Delayed](https://docs.dask.org/en/latest/delayed.html)
object using the `.to_dask()` method. For example

```{code-block} python
delayed = recipe.to_dask()
delayed.compute()
```

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.PrefectPipelineExecutor
:members:
The `delayed` object can be executed by any of Dask's schedulers, including
cloud and HPC distributed schedulers.

### Prefect Flow

You can convert your recipe to a [Prefect Flow](https://docs.prefect.io/core/concepts/flows.html) using
the :meth:`BaseRecipe.to_prefect()` method. For example

```{code-block} python
flow = recipe.to_prefect()
flow.run()
```

By default the flow is run using Prefect's [LocalExecutor](https://docs.prefect.io/orchestration/flow_config/executors.html#localexecutor). See [executors](https://docs.prefect.io/orchestration/flow_config/executors.html) for more.
126 changes: 49 additions & 77 deletions docs/tutorials/cmip6-recipe.ipynb
100644 → 100755

Large diffs are not rendered by default.

644 changes: 372 additions & 272 deletions docs/tutorials/multi_variable_recipe.ipynb
100644 → 100755

Large diffs are not rendered by default.

2,372 changes: 352 additions & 2,020 deletions docs/tutorials/netcdf_zarr_sequential.ipynb
100644 → 100755

Large diffs are not rendered by default.

3,212 changes: 447 additions & 2,765 deletions docs/tutorials/terraclimate.ipynb
100644 → 100755

Large diffs are not rendered by default.

111 changes: 110 additions & 1 deletion pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from abc import ABC, abstractmethod
from functools import partial
from typing import Callable, Hashable, Iterable
Expand Down Expand Up @@ -78,7 +79,11 @@ def finalize_target(self) -> Callable[[], None]:
def to_pipelines(self) -> ParallelPipelines:
"""Translate recipe to pipeline for execution.
"""

warnings.warn(
"'to_pipelines' is deprecated. Use one of 'to_function', 'to_dask', or "
"'to_prefect' directly instead.",
FutureWarning,
)
pipeline = [] # type: MultiStagePipeline
if getattr(self, "cache_inputs", False): # TODO: formalize this contract
pipeline.append(Stage(self.cache_input, list(self.iter_inputs())))
Expand All @@ -89,6 +94,98 @@ def to_pipelines(self) -> ParallelPipelines:
pipelines.append(pipeline)
return pipelines

def to_function(self) -> Callable[[], None]:
"""
Translate the recipe to a Python function for execution.
"""

def pipeline():
# TODO: formalize this contract
if getattr(self, "cache_inputs", False):
for input_key in self.iter_inputs():
self.cache_input(input_key)
self.prepare_target()
for chunk_key in self.iter_chunks():
self.store_chunk(chunk_key)
self.finalize_target()

return pipeline

def to_dask(self):
"""
Translate the recipe to a dask.Delayed object for parallel execution.
"""
# This manually builds a Dask task graph with each stage of the recipe.
# We use a few "checkpoints" to ensure that downstream tasks depend
# on upstream tasks being done before starting. We use a manual task
# graph rather than dask.delayed to avoid some expensive tokenization
# in dask.delayed
import dask
from dask.delayed import Delayed

# TODO: HighlevelGraph layers for each of these mapped inputs.
# Cache Input --------------------------------------------------------
dsk = {}
token = dask.base.tokenize(self)

if getattr(self, "cache_inputs", False): # TODO: formalize cache_inputs
for i, input_key in enumerate(self.iter_inputs()):
dsk[(f"cache_input-{token}", i)] = (self.cache_input, input_key)

# Prepare Target ------------------------------------------------------
dsk[f"checkpoint_0-{token}"] = (lambda *args: None, list(dsk))
dsk[f"prepare_target-{token}"] = (
_prepare_target,
f"checkpoint_0-{token}",
self.prepare_target,
)

# Store Chunk --------------------------------------------------------
keys = []
for i, chunk_key in enumerate(self.iter_chunks()):
k = (f"store_chunk-{token}", i)
dsk[k] = (_store_chunk, f"prepare_target-{token}", self.store_chunk, chunk_key)
keys.append(k)

# Finalize Target -----------------------------------------------------
dsk[f"checkpoint_1-{token}"] = (lambda *args: None, keys)
key = f"finalize_target-{token}"
dsk[key] = (_finalize_target, f"checkpoint_1-{token}", self.finalize_target)

return Delayed(key, dsk)

def to_prefect(self):
"""Compile the recipe to a Prefect.Flow object."""
from prefect import Flow, task, unmapped

has_cache_inputs = getattr(self, "cache_inputs", False)
if has_cache_inputs:
cache_input_task = task(self.cache_input, name="cache_input")
prepare_target_task = task(self.prepare_target, name="prepare_target")
store_chunk_task = task(self.store_chunk, name="store_chunk")
finalize_target_task = task(self.finalize_target, name="finalize_target")

with Flow("pangeo-forge-recipe") as flow:
if has_cache_inputs:
cache_task = cache_input_task.map(input_key=list(self.iter_inputs()))
upstream_tasks = [cache_task]
else:
upstream_tasks = []
prepare_task = prepare_target_task(upstream_tasks=upstream_tasks)
store_task = store_chunk_task.map(
chunk_key=list(self.iter_chunks()), upstream_tasks=[unmapped(prepare_task)],
)
_ = finalize_target_task(upstream_tasks=[store_task])

return flow

def __iter__(self):
if hasattr(self, "cache_inputs"):
yield self.cache_input, self.iter_inputs()
yield self.prepare_target, []
yield self.store_chunk, self.iter_chunks()
yield self.finalize_target, []

# https://stackoverflow.com/questions/59986413/achieving-multiple-inheritance-using-python-dataclasses
def __post_init__(self):
# just intercept the __post_init__ calls so they
Expand All @@ -111,3 +208,15 @@ def wrapped(*args, **kwargs):
return new_func

return wrapped


def _prepare_target(checkpoint, func):
return func()


def _store_chunk(checkpoint, func, input_key):
return func(input_key)


def _finalize_target(checkpoint, func):
return func()
Loading

0 comments on commit e1ef575

Please sign in to comment.