From 7d93cfd49e95636f1b7c6bee166bae1441f651eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20He=C3=9Felmann=20=28lgtf/39809=29?= Date: Mon, 22 Feb 2021 17:37:02 +0100 Subject: [PATCH] Fix: Cube index validation (#413) The issue occured when updating a cube with multiple datasets, which also have different dimension columns. This fixes issue #413. The bug is caused by access of a mutated variable - namely `required_indices` via `table_indices` - in the loop. I rewrote the loop to circumvent this problem and added a unit test which verifies that the index validation is working as expected. --- CHANGES.rst | 1 + kartothek/io/dask/common_cube.py | 11 ++++--- tests/io/dask/test_common_cube.py | 49 +++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 tests/io/dask/test_common_cube.py diff --git a/CHANGES.rst b/CHANGES.rst index 0d643c79..75dff837 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,7 @@ Version 3.19.1 (2021-02-XX) =========================== * Allow ``pyarrow==3`` as a dependency. +* Fix an issue with the cube index validation introduced in v3.19.0 (#413). Version 3.19.0 (2021-02-12) =========================== diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 9d180b35..2fc6bfb7 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -66,17 +66,16 @@ def ensure_valid_cube_indices( `index_columns` and `suppress_index_on` fields adjusted to reflect the existing datasets. """ - required_indices = set(cube.index_columns) - suppress_index_on = set(cube.suppress_index_on) + dataset_indices = [] for ds in existing_datasets.values(): for internal_table in ds.table_meta: dataset_columns = set(ds.table_meta[internal_table].names) - table_indices = required_indices & dataset_columns + table_indices = cube.index_columns & dataset_columns compatible_indices = _ensure_compatible_indices(ds, table_indices) if compatible_indices: - dataset_indices = set(compatible_indices) - suppress_index_on -= dataset_indices - required_indices |= dataset_indices + dataset_indices.append(set(compatible_indices)) + required_indices = cube.index_columns.union(*dataset_indices) + suppress_index_on = cube.suppress_index_on.difference(*dataset_indices) # Need to remove dimension columns since they *are* technically indices but # the cube interface class declares them as not indexed just to add them # later on, assuming it is not blacklisted diff --git a/tests/io/dask/test_common_cube.py b/tests/io/dask/test_common_cube.py new file mode 100644 index 00000000..cd78b6c7 --- /dev/null +++ b/tests/io/dask/test_common_cube.py @@ -0,0 +1,49 @@ +from kartothek.core.cube.cube import Cube +from kartothek.core.dataset import DatasetMetadata +from kartothek.io.dask.common_cube import ensure_valid_cube_indices + + +class FakeSeedTableMetadata: + names = ["d1", "d2", "p", "i"] + + +class FakeExtraTableMetadata: + names = ["d1", "p", "i"] + + +def test_cube_indices_are_validated(): + source_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "source", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeSeedTableMetadata()}, + "partition_keys": ["p"], + "indices": { + "d1": {"1": ["part_1"]}, + "d2": {"1": ["part_1"]}, + "i": {"1": ["part_1"]}, + }, + } + ) + extra_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "extra", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeExtraTableMetadata()}, + "partition_keys": ["p"], + "indices": {"i": {"1": ["part_1"]}}, + } + ) + cube = Cube( + dimension_columns=["d1", "d2"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["i"], + ) + + validated_cube = ensure_valid_cube_indices( + {"source": source_metadata, "extra": extra_metadata}, cube + ) + + assert validated_cube == cube