Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist execution context in storage target #359

Merged
merged 14 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/pangeo_forge_recipes/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## v0.9.1 - Unreleased

- Persist Pangeo Forge execution context metadata in target datasets. This information, which includes
the `pangeo-forge-recipes` version as well as recipe and input hashes, attaches execution provenance
to the dataset itself. {pull}`359`

## v0.9 - 2022-05-11

- **Breaking changes:** Deprecated `XarrayZarrRecipe` manual stage methods. Manual execution can be
Expand Down
23 changes: 23 additions & 0 deletions docs/pangeo_forge_recipes/recipe_user_guide/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,26 @@ with beam.Pipeline() as p:

By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/).
See [runners](https://beam.apache.org/documentation/#runners) for more.


## Execution context

All Pangeo Forge {doc}`recipes` contain a `.get_execution_context()` method which returns the
following metadata:

```{code-block} python
{
"pangeo-forge:version": "{pangeo_forge_recipes version installed at time of execution}"
"pangeo-forge:recipe_hash": "{recipe hash as returned by `recipe.sha256()`}"
"pangeo-forge:inputs_hash": "{file pattern hash as returned by `recipe.file_pattern.sha256()`}"
}
```

Each recipe class defines where to store this metadata:

- `XarrayZarrRecipe`: Added to Zarr group attributes, and therefore also available via the
`xarray.Dataset.attrs` when opening Zarr stores with xarray.
- `HDFReferenceRecipe`: TODO

The execution context metadata which is persisted in the target dataset is used for tracking
dataset provenance.
10 changes: 10 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses import dataclass, field, replace
from typing import Callable, ClassVar

import pkg_resources # type: ignore

from ..executors.base import Pipeline
from ..patterns import FilePattern, prune_pattern
from ..serialization import dataclass_sha256
Expand Down Expand Up @@ -50,6 +52,14 @@ def to_beam(self):
def sha256(self):
return dataclass_sha256(self, ignore_keys=self._hash_exclude_)

def get_execution_context(self):
return dict(
# See https://stackoverflow.com/a/2073599 re: version
version=pkg_resources.require("pangeo-forge-recipes")[0].version,
recipe_hash=self.sha256().hex(),
inputs_hash=self.file_pattern.sha256().hex(),
)


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
4 changes: 4 additions & 0 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ def filter_init_chunks(chunk_key):
recipe_meta = {"input_sequence_lens": input_sequence_lens}
config.storage_config.metadata[_GLOBAL_METADATA_KEY] = recipe_meta

zgroup = zarr.open_group(config.target_mapper)
for k, v in config.get_execution_context().items():
zgroup.attrs[f"pangeo-forge:{k}"] = v


def store_chunk(chunk_key: ChunkKey, *, config: XarrayZarrRecipe) -> None:
if config.storage_config.target is None:
Expand Down
3 changes: 3 additions & 0 deletions pangeo_forge_recipes/serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
from collections.abc import Collection
from dataclasses import asdict
from enum import Enum
Expand All @@ -17,6 +18,8 @@ def either_encode_or_hash(obj: Any):
return obj.value
elif hasattr(obj, "sha256"):
return obj.sha256().hex()
elif inspect.isfunction(obj):
return inspect.getsource(obj)
raise TypeError(f"object of type {type(obj).__name__} not serializable")


Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ max-line-length = 100

[isort]
known_first_party=pangeo_forge_recipes
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,packaging,pandas,pkg_resources,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down
36 changes: 28 additions & 8 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@
from pangeo_forge_recipes.storage import MetadataTarget, StorageConfig


def drop_execution_context_attrs(ds: xr.Dataset) -> xr.Dataset:
"""Drop pangeo-forge execution context attrs from a dataset."""

ds_copy = ds.copy()
to_drop = [k for k in ds_copy.attrs if k.startswith("pangeo-forge:")]
for k in to_drop:
del ds_copy.attrs[k]

return ds_copy


def assert_identical(ds1: xr.Dataset, ds2: xr.Dataset):
"""Assert that two datasets are identical, excluding execution context attrs."""

xr.testing.assert_identical(
drop_execution_context_attrs(ds1),
drop_execution_context_attrs(ds2),
)


def make_netCDFtoZarr_recipe(
file_pattern, xarray_dataset, target, cache, metadata, extra_kwargs=None
):
Expand Down Expand Up @@ -120,7 +140,7 @@ def test_recipe(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("get_mapper_from", ["storage_config", "target", "target_mapper"])
Expand All @@ -139,7 +159,7 @@ def test_recipe_default_storage(recipe_fixture, execute_recipe, get_mapper_from)
elif get_mapper_from == "target_mapper":
mapper = rec.target_mapper
ds_actual = xr.open_zarr(mapper).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand All @@ -150,7 +170,7 @@ def test_recipe_with_references(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, open_input_with_kerchunk=True, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand Down Expand Up @@ -195,7 +215,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input
)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


# function passed to preprocessing
Expand Down Expand Up @@ -228,7 +248,7 @@ def test_process(recipe_fixture, execute_recipe, process_input, process_chunk):
assert not ds_actual.identical(ds_expected)
ds_expected = incr_date(ds_expected)

xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


def do_actual_chunks_test(
Expand Down Expand Up @@ -303,7 +323,7 @@ def do_actual_chunks_test(
for dim in ds_actual.dims:
assert store[dim].chunks == ds_actual[dim].shape

xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("inputs_per_chunk,subset_inputs", [(1, {}), (1, {"time": 2}), (2, {})])
Expand Down Expand Up @@ -376,7 +396,7 @@ def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
rec.consolidate_dimension_coordinates = False
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)

store = zarr.open_consolidated(target.get_mapper())
assert store["time"].chunks == (file_pattern.nitems_per_input["time"],)
Expand All @@ -399,7 +419,7 @@ def test_consolidate_dimension_coordinates_with_coordinateless_dimension(
rec = RecipeClass(file_pattern, **kwargs)
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


def test_lock_timeout(netCDFtoZarr_recipe_sequential_only, execute_recipe_no_dask):
Expand Down
29 changes: 29 additions & 0 deletions tests/recipe_tests/test_execution_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
import xarray as xr
import zarr
from packaging import version

from pangeo_forge_recipes.recipes import XarrayZarrRecipe


@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe]) # HDFReferenceRecipe])
def test_execution_context(recipe_cls, netcdf_local_file_pattern_sequential):

recipe = recipe_cls(netcdf_local_file_pattern_sequential)
ec = recipe.get_execution_context()

ec_version = version.parse(ec["version"])
assert ec_version.is_devrelease # should be True for editable installs used in tests
assert isinstance(ec_version.major, int) and 0 <= ec_version.major <= 1
assert isinstance(ec_version.minor, int) and 0 <= ec_version.major <= 99

assert isinstance(ec["recipe_hash"], str) and len(ec["recipe_hash"]) == 64
assert isinstance(ec["inputs_hash"], str) and len(ec["inputs_hash"]) == 64

recipe.to_function()()
zgroup = zarr.open_group(recipe.target_mapper)
ds = xr.open_zarr(recipe.target_mapper, consolidated=True)

for k, v in ec.items():
assert zgroup.attrs[f"pangeo-forge:{k}"] == v
assert ds.attrs[f"pangeo-forge:{k}"] == v
10 changes: 5 additions & 5 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
from typing import Optional

import pandas as pd
import pytest
Expand Down Expand Up @@ -171,17 +171,17 @@ class NewRelease(cls):


def test_either_encode_or_hash_raises():
def f():
class A:
pass

@dataclass
class HasUnserializableField:
unserializable_field: Callable = f
unserializable_field: type = A

expected_msg = f"object of type {type(f).__name__} not serializable"
expected_msg = f"object of type {type(A).__name__} not serializable"

with pytest.raises(TypeError, match=expected_msg):
either_encode_or_hash(f)
either_encode_or_hash(A)

with pytest.raises(TypeError, match=expected_msg):
# in practice, we never actually call ``either_encode_or_hash`` directly.
Expand Down