From 316d176470703e8068e5b8dc37e9005099d00059 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 2 Feb 2021 17:23:53 +0100 Subject: [PATCH] Ensure updates on neither cube nor plain datasets corrupt indices --- CHANGES.rst | 6 ++ kartothek/api/consistency.py | 2 +- kartothek/io/dask/_shuffle.py | 8 ++- kartothek/io/dask/common_cube.py | 25 +++++++ kartothek/io/dask/dataframe.py | 12 +++- kartothek/io/eager.py | 9 +-- kartothek/io/testing/update.py | 27 +++++++ kartothek/io/testing/update_cube.py | 92 ++++++++++++++++++++++++ kartothek/io_components/metapartition.py | 10 ++- kartothek/io_components/utils.py | 18 +++-- kartothek/io_components/write.py | 8 ++- 11 files changed, 201 insertions(+), 16 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 4f5761aa..220a756b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,12 @@ Changelog ========= +Version 3.18.1 (2021-02-XY) +=========================== + +* Fix an issue where updates on cubes or updates on datatsets using + dask.dataframe might not update all secondary indices, resulting in a corrupt + state after the update Version 3.18.0 (2021-01-25) =========================== diff --git a/kartothek/api/consistency.py b/kartothek/api/consistency.py index 76813dc8..a7b6a876 100644 --- a/kartothek/api/consistency.py +++ b/kartothek/api/consistency.py @@ -200,7 +200,7 @@ def _check_indices(datasets: Dict[str, DatasetMetadata], cube: Cube) -> None: For all datasets the primary indices must be equal to ``ds.partition_keys``. For the seed dataset, secondary indices for all dimension columns except ``cube.suppress_index_on`` are expected. - Additional indices are accepted and will not bew reported as error. + Additional indices are accepted and will not be reported as error. Parameters ---------- diff --git a/kartothek/io/dask/_shuffle.py b/kartothek/io/dask/_shuffle.py index 08f434c9..c639efd6 100644 --- a/kartothek/io/dask/_shuffle.py +++ b/kartothek/io/dask/_shuffle.py @@ -12,6 +12,12 @@ from kartothek.io_components.write import write_partition from kartothek.serialization import DataFrameSerializer +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + _KTK_HASH_BUCKET = "__KTK_HASH_BUCKET" @@ -35,7 +41,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: def shuffle_store_dask_partitions( ddf: dd.DataFrame, table: str, - secondary_indices: Optional[Union[str, Sequence[str]]], + secondary_indices: Optional[Union[Literal[False], Sequence[str]]], metadata_version: int, partition_on: List[str], store_factory: StoreFactory, diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 65b0310d..b83a4af2 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -30,10 +30,12 @@ prepare_ktk_partition_on, ) from kartothek.io_components.metapartition import ( + SINGLE_TABLE, MetaPartition, parse_input_to_metapartition, ) from kartothek.io_components.update import update_dataset_from_partitions +from kartothek.io_components.utils import _ensure_compatible_indices from kartothek.io_components.write import ( raise_if_dataset_exists, store_dataset_from_partitions, @@ -48,6 +50,26 @@ ) +def ensure_valid_cube_indices(existing_datasets, cube): + required_indices = set(cube.index_columns) + suppress_index_on = set(cube.suppress_index_on) + for ds in existing_datasets.values(): + dataset_columns = set(ds.table_meta[SINGLE_TABLE].names) + table_indices = required_indices & dataset_columns + compatible_indices = _ensure_compatible_indices(ds, table_indices) + if compatible_indices: + compatible_indices = set(compatible_indices) + suppress_index_on -= compatible_indices + required_indices |= compatible_indices + # Need to remove dimension columns since they *are* technically indices but + # the cube interface class declares them as not indexed just to add them + # later on, assuming it is not blacklisted + return cube.copy( + index_columns=required_indices - set(cube.dimension_columns), + suppress_index_on=suppress_index_on, + ) + + def build_cube_from_bag_internal( data, cube, store, ktk_cube_dataset_ids, metadata, overwrite, partition_on ): @@ -90,6 +112,7 @@ def build_cube_from_bag_internal( existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store) check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets) partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on) + cube = ensure_valid_cube_indices(existing_datasets, cube) data = ( data.map(multiplex_user_input, cube=cube) @@ -167,6 +190,7 @@ def extend_cube_from_bag_internal( partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on) existing_datasets = discover_datasets(cube, store) + cube = ensure_valid_cube_indices(existing_datasets, cube) if overwrite: existing_datasets_cut = { ktk_cube_dataset_id: ds @@ -337,6 +361,7 @@ def append_to_cube_from_bag_internal( metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids) existing_datasets = discover_datasets(cube, store) + cube = ensure_valid_cube_indices(existing_datasets, cube) # existing_payload is set to empty because we're not checking against any existing payload. ktk will account for the # compat check within 1 dataset existing_payload = set() diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 8f82dbeb..bb794dc6 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -49,6 +49,11 @@ from ._utils import _maybe_get_categoricals_from_index from .delayed import read_table_as_delayed +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + @default_docs @normalize_args @@ -321,7 +326,7 @@ def _write_dataframe_partitions( store: StoreFactory, dataset_uuid: str, table: str, - secondary_indices: List[str], + secondary_indices: Union[Literal[False], List[str]], shuffle: bool, repartition_ratio: Optional[SupportsFloat], num_buckets: int, @@ -415,7 +420,8 @@ def update_dataset_from_ddf( ds_factory=factory, ) - _ensure_compatible_indices(ds_factory, secondary_indices) + inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + del secondary_indices if ds_factory is not None: check_single_table_dataset(ds_factory, table) @@ -425,7 +431,7 @@ def update_dataset_from_ddf( store=store, dataset_uuid=dataset_uuid or ds_factory.dataset_uuid, table=table, - secondary_indices=secondary_indices, + secondary_indices=inferred_indices, shuffle=shuffle, repartition_ratio=repartition_ratio, num_buckets=num_buckets, diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index 53b63a00..f7e49365 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -749,12 +749,13 @@ def update_dataset_from_dataframes( partition_on=partition_on, ) - secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + del secondary_indices mp = parse_input_to_metapartition( df_list, metadata_version=metadata_version, - expected_secondary_indices=secondary_indices, + expected_secondary_indices=inferred_indices, ) if sort_partitions_by: @@ -763,8 +764,8 @@ def update_dataset_from_dataframes( if partition_on: mp = mp.partition_on(partition_on) - if secondary_indices: - mp = mp.build_indices(secondary_indices) + if inferred_indices: + mp = mp.build_indices(inferred_indices) mp = mp.store_dataframes( store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer diff --git a/kartothek/io/testing/update.py b/kartothek/io/testing/update.py index a18b0e8d..943aecf2 100644 --- a/kartothek/io/testing/update.py +++ b/kartothek/io/testing/update.py @@ -585,6 +585,8 @@ def test_raises_on_invalid_input(store_factory, bound_update_dataset): def test_raises_on_new_index_creation( backend_identifier, store_factory, bound_update_dataset, define_indices_on_partition ): + # This test can be removed once the variable index input is removed in + # favour of the test `test_update_secondary_indices_subset` if backend_identifier == "dask.dataframe" and define_indices_on_partition: pytest.skip() # Constructs a dataframe which ignores index information passed as dict @@ -626,6 +628,31 @@ def test_raises_on_new_index_creation( ) +def test_update_secondary_indices_subset(store_factory, bound_update_dataset): + df1 = pd.DataFrame({"A": range(10), "indexed": 1}) + dataset_uuid = "dataset_uuid" + bound_update_dataset( + df1, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="indexed" + ) + + df2 = pd.DataFrame({"A": range(10), "indexed": 2}) + # secondary index is omitted. Kartothek should pick it up regardless + bound_update_dataset(df2, dataset_uuid=dataset_uuid, store=store_factory) + + dm = DatasetMetadata.load_from_store( + dataset_uuid, store_factory(), load_all_indices=True + ) + obs_values = dm.indices["indexed"].observed_values() + + assert sorted(obs_values) == [1, 2] + + with pytest.raises(ValueError, match="Incorrect indices provided"): + # secondary index is omitted. Kartothek should pick it up regardless + bound_update_dataset( + df2, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="A" + ) + + @pytest.mark.parametrize("define_indices_on_partition", (False, True)) def test_update_first_time_with_secondary_indices( store_factory, bound_update_dataset, define_indices_on_partition diff --git a/kartothek/io/testing/update_cube.py b/kartothek/io/testing/update_cube.py index f16945a5..450fe1e9 100644 --- a/kartothek/io/testing/update_cube.py +++ b/kartothek/io/testing/update_cube.py @@ -158,3 +158,95 @@ def test_update_respects_ktk_cube_dataset_ids( assert set(df_ex_read["p"]) == {1} else: assert set(df_ex_read["p"]) == {0, 1} + + +def test_cube_update_secondary_indices_subset(function_store, driver): + + cube1 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["indexed"], + ) + df_1 = pd.DataFrame({"A": range(10), "P": 1, "indexed": 1, "not-indexed": 1}) + build_cube( + data={"source": df_1}, + cube=cube1, + store=function_store, + metadata={"source": {"meta_at_create": "data"}}, + ) + + cube2 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + ) + df_2 = pd.DataFrame({"A": range(10, 20), "P": 1, "indexed": 2, "not-indexed": 1}) + driver( + data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None + ) + + dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset) + dm = DatasetMetadata.load_from_store( + dataset_uuid, function_store(), load_all_indices=True + ) + obs_values = dm.indices["indexed"].observed_values() + + assert sorted(obs_values) == [1, 2] + + cube2 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["not-indexed"], + ) + with pytest.raises( + ValueError, + match='ExplicitSecondaryIndex or PartitionIndex "not-indexed" is missing in dataset', + ): + driver( + data={"source": df_2}, + cube=cube2, + store=function_store, + remove_conditions=None, + ) + + +def test_cube_blacklist_dimension_index(function_store, driver): + + cube1 = Cube( + dimension_columns=["A", "B"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + ) + df_1 = pd.DataFrame({"A": range(10), "P": 1, "B": 1, "payload": ""}) + build_cube( + data={"source": df_1}, + cube=cube1, + store=function_store, + metadata={"source": {"meta_at_create": "data"}}, + ) + + cube2 = Cube( + dimension_columns=["A", "B"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + suppress_index_on=["B"], + ) + df_2 = pd.DataFrame({"A": range(10), "P": 1, "B": 2, "payload": ""}) + driver( + data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None + ) + + dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset) + dm = DatasetMetadata.load_from_store( + dataset_uuid, function_store(), load_all_indices=True + ) + obs_values = dm.indices["B"].observed_values() + + assert sorted(obs_values) == [1, 2] diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index b394ae4f..372cc79e 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -42,6 +42,12 @@ filter_df_from_predicates, ) +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + LOGGER = logging.getLogger(__name__) SINGLE_TABLE = "table" @@ -1647,7 +1653,9 @@ def partition_labels_from_mps(mps): def parse_input_to_metapartition( - obj, metadata_version=None, expected_secondary_indices=False + obj: Optional[Union[Dict, pd.DataFrame, Sequence, MetaPartition]], + metadata_version: Optional[int] = None, + expected_secondary_indices: Optional[Union[Literal[False], Sequence[str]]] = False, ) -> MetaPartition: """ Parses given user input and returns a MetaPartition diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index c6c8218f..75f94fe3 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -4,13 +4,13 @@ import collections import inspect import logging -from typing import List, Optional, TypeVar, Union, overload +from typing import List, Optional, Sequence, TypeVar, Union, overload import decorator import pandas as pd from kartothek.core.dataset import DatasetMetadata -from kartothek.core.factory import _ensure_factory +from kartothek.core.factory import DatasetFactory, _ensure_factory from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store, lazy_store @@ -110,11 +110,16 @@ def _combine_metadata(dataset_metadata, append_to_list): return InvalidObject() -def _ensure_compatible_indices(dataset, secondary_indices): +def _ensure_compatible_indices( + dataset: Optional[Union[DatasetMetadata, DatasetFactory]], + secondary_indices: Optional[Sequence[str]], +) -> Union[Literal[False], List[str]]: if dataset: ds_secondary_indices = list(dataset.secondary_indices.keys()) - if secondary_indices and set(ds_secondary_indices) != set(secondary_indices): + if secondary_indices and not set(secondary_indices).issubset( + ds_secondary_indices + ): raise ValueError( f"Incorrect indices provided for dataset.\n" f"Expected: {ds_secondary_indices}\n" @@ -125,7 +130,10 @@ def _ensure_compatible_indices(dataset, secondary_indices): # We return `False` if there is no dataset in storage and `secondary_indices` is undefined # (`secondary_indices` is normalized to `[]` by default). # In consequence, `parse_input_to_metapartition` will not check indices at the partition level. - return secondary_indices or False + if secondary_indices: + return list(secondary_indices) + else: + return False def _ensure_valid_indices(mp_indices, secondary_indices=None, data=None): diff --git a/kartothek/io_components/write.py b/kartothek/io_components/write.py index 50dc155d..23258503 100644 --- a/kartothek/io_components/write.py +++ b/kartothek/io_components/write.py @@ -29,12 +29,18 @@ ) from kartothek.serialization import DataFrameSerializer +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + SINGLE_CATEGORY = SINGLE_TABLE def write_partition( partition_df: Any, # TODO: Establish typing for parse_input_to_metapartition - secondary_indices: Optional[Union[str, Sequence[str]]], + secondary_indices: Optional[Union[Literal[False], Sequence[str]]], sort_partitions_by: Optional[Union[str, Sequence[str]]], dataset_uuid: str, partition_on: Optional[Union[str, Sequence[str]]],