Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Cube index validation (#413) #418

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Version 3.19.1 (2021-02-XX)
===========================

* Allow ``pyarrow==3`` as a dependency.
* Fix an issue with the cube index validation introduced in v3.19.0 (#413).

Version 3.19.0 (2021-02-12)
===========================
Expand Down
11 changes: 5 additions & 6 deletions kartothek/io/dask/common_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,16 @@ def ensure_valid_cube_indices(
`index_columns` and `suppress_index_on` fields adjusted to reflect the
existing datasets.
"""
required_indices = set(cube.index_columns)
suppress_index_on = set(cube.suppress_index_on)
dataset_indices = []
for ds in existing_datasets.values():
for internal_table in ds.table_meta:
dataset_columns = set(ds.table_meta[internal_table].names)
table_indices = required_indices & dataset_columns
table_indices = cube.index_columns & dataset_columns
compatible_indices = _ensure_compatible_indices(ds, table_indices)
if compatible_indices:
dataset_indices = set(compatible_indices)
suppress_index_on -= dataset_indices
required_indices |= dataset_indices
dataset_indices.append(set(compatible_indices))
required_indices = cube.index_columns.union(*dataset_indices)
suppress_index_on = cube.suppress_index_on.difference(*dataset_indices)
# Need to remove dimension columns since they *are* technically indices but
# the cube interface class declares them as not indexed just to add them
# later on, assuming it is not blacklisted
Expand Down
178 changes: 178 additions & 0 deletions tests/io/dask/test_common_cube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import pytest

from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadata
from kartothek.io.dask.common_cube import ensure_valid_cube_indices


class FakeSeedTableMetadata:
names = ["d1", "d2", "p", "i1", "i2"]


class FakeExtraTableMetadata:
names = ["d1", "p", "i1"]


def test_cube_with_valid_indices_is_not_modified_by_validation():
"""
Test that a cube with valid indices is not modified by `ensure_valid_cube_indices`
"""
source_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "source",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeSeedTableMetadata()},
"partition_keys": ["p"],
"indices": {
"d1": {"1": ["part_1"]},
"d2": {"1": ["part_1"]},
"i1": {"1": ["part_1"]},
},
}
)
extra_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "extra",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeExtraTableMetadata()},
"partition_keys": ["p"],
"indices": {"i1": {"1": ["part_1"]}},
}
)
cube = Cube(
dimension_columns=["d1", "d2"],
partition_columns=["p"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["i1"],
)

validated_cube = ensure_valid_cube_indices(
{"source": source_metadata, "extra": extra_metadata}, cube
)

assert validated_cube == cube


def test_existing_indices_are_added_when_missing_in_cube():
"""
Test that indices already existing in the dataset are added to the validated cube
"""
source_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "source",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeSeedTableMetadata()},
"partition_keys": ["p"],
"indices": {
"d1": {"1": ["part_1"]},
"d2": {"1": ["part_1"]},
"i1": {"1": ["part_1"]},
"i2": {"1": ["part_1"]},
},
}
)
extra_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "extra",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeExtraTableMetadata()},
"partition_keys": ["p"],
"indices": {"i1": {"1": ["part_1"]}},
}
)
cube = Cube(
dimension_columns=["d1", "d2"],
partition_columns=["p"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["i1"],
)

validated_cube = ensure_valid_cube_indices(
{"source": source_metadata, "extra": extra_metadata}, cube
)

assert validated_cube.index_columns == {"i1", "i2"}


def test_raises_when_cube_defines_index_not_in_dataset():
"""
Test that a `ValueError` is raised when the cube defines an index that is not part of a dataset
"""
source_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "source",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeSeedTableMetadata()},
"partition_keys": ["p"],
"indices": {
"d1": {"1": ["part_1"]},
"d2": {"1": ["part_1"]},
"i1": {"1": ["part_1"]},
},
}
)
extra_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "extra",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeExtraTableMetadata()},
"partition_keys": ["p"],
"indices": {"i1": {"1": ["part_1"]}},
}
)
cube = Cube(
dimension_columns=["d1", "d2"],
partition_columns=["p"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["i2"],
)

with pytest.raises(ValueError):
ensure_valid_cube_indices(
{"source": source_metadata, "extra": extra_metadata}, cube
)


def test_no_indices_are_suppressed_when_they_already_exist():
"""
Test that no indicies marked as suppressed in the cube are actually suppressed when
they are already present in the dataset
"""
source_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "source",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeSeedTableMetadata()},
"partition_keys": ["p"],
"indices": {
"d1": {"1": ["part_1"]},
"d2": {"1": ["part_1"]},
"i1": {"1": ["part_1"]},
},
}
)
extra_metadata = DatasetMetadata.from_dict(
{
"dataset_uuid": "extra",
"dataset_metadata_version": 4,
"table_meta": {"table": FakeExtraTableMetadata()},
"partition_keys": ["p"],
"indices": {"i1": {"1": ["part_1"]}},
}
)
cube = Cube(
dimension_columns=["d1", "d2"],
partition_columns=["p"],
uuid_prefix="cube",
seed_dataset="source",
suppress_index_on=["d1", "d2"],
)

validated_cube = ensure_valid_cube_indices(
{"source": source_metadata, "extra": extra_metadata}, cube
)

assert validated_cube.suppress_index_on == frozenset()