From 5ae4379b48ca8db3a3c1ad3dc794ffa290ec2a55 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 2 Feb 2021 17:23:53 +0100 Subject: [PATCH 1/8] Ensure updates on neither cube nor plain datasets corrupt indices --- CHANGES.rst | 6 ++ kartothek/api/consistency.py | 2 +- kartothek/io/dask/_shuffle.py | 8 ++- kartothek/io/dask/common_cube.py | 25 +++++++ kartothek/io/dask/dataframe.py | 12 +++- kartothek/io/eager.py | 9 +-- kartothek/io/testing/update.py | 27 +++++++ kartothek/io/testing/update_cube.py | 92 ++++++++++++++++++++++++ kartothek/io_components/metapartition.py | 10 ++- kartothek/io_components/utils.py | 18 +++-- kartothek/io_components/write.py | 8 ++- 11 files changed, 201 insertions(+), 16 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 4f5761aa..220a756b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,12 @@ Changelog ========= +Version 3.18.1 (2021-02-XY) +=========================== + +* Fix an issue where updates on cubes or updates on datatsets using + dask.dataframe might not update all secondary indices, resulting in a corrupt + state after the update Version 3.18.0 (2021-01-25) =========================== diff --git a/kartothek/api/consistency.py b/kartothek/api/consistency.py index 76813dc8..a7b6a876 100644 --- a/kartothek/api/consistency.py +++ b/kartothek/api/consistency.py @@ -200,7 +200,7 @@ def _check_indices(datasets: Dict[str, DatasetMetadata], cube: Cube) -> None: For all datasets the primary indices must be equal to ``ds.partition_keys``. For the seed dataset, secondary indices for all dimension columns except ``cube.suppress_index_on`` are expected. - Additional indices are accepted and will not bew reported as error. + Additional indices are accepted and will not be reported as error. Parameters ---------- diff --git a/kartothek/io/dask/_shuffle.py b/kartothek/io/dask/_shuffle.py index 08f434c9..c639efd6 100644 --- a/kartothek/io/dask/_shuffle.py +++ b/kartothek/io/dask/_shuffle.py @@ -12,6 +12,12 @@ from kartothek.io_components.write import write_partition from kartothek.serialization import DataFrameSerializer +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + _KTK_HASH_BUCKET = "__KTK_HASH_BUCKET" @@ -35,7 +41,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: def shuffle_store_dask_partitions( ddf: dd.DataFrame, table: str, - secondary_indices: Optional[Union[str, Sequence[str]]], + secondary_indices: Optional[Union[Literal[False], Sequence[str]]], metadata_version: int, partition_on: List[str], store_factory: StoreFactory, diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 65b0310d..b83a4af2 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -30,10 +30,12 @@ prepare_ktk_partition_on, ) from kartothek.io_components.metapartition import ( + SINGLE_TABLE, MetaPartition, parse_input_to_metapartition, ) from kartothek.io_components.update import update_dataset_from_partitions +from kartothek.io_components.utils import _ensure_compatible_indices from kartothek.io_components.write import ( raise_if_dataset_exists, store_dataset_from_partitions, @@ -48,6 +50,26 @@ ) +def ensure_valid_cube_indices(existing_datasets, cube): + required_indices = set(cube.index_columns) + suppress_index_on = set(cube.suppress_index_on) + for ds in existing_datasets.values(): + dataset_columns = set(ds.table_meta[SINGLE_TABLE].names) + table_indices = required_indices & dataset_columns + compatible_indices = _ensure_compatible_indices(ds, table_indices) + if compatible_indices: + compatible_indices = set(compatible_indices) + suppress_index_on -= compatible_indices + required_indices |= compatible_indices + # Need to remove dimension columns since they *are* technically indices but + # the cube interface class declares them as not indexed just to add them + # later on, assuming it is not blacklisted + return cube.copy( + index_columns=required_indices - set(cube.dimension_columns), + suppress_index_on=suppress_index_on, + ) + + def build_cube_from_bag_internal( data, cube, store, ktk_cube_dataset_ids, metadata, overwrite, partition_on ): @@ -90,6 +112,7 @@ def build_cube_from_bag_internal( existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store) check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets) partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on) + cube = ensure_valid_cube_indices(existing_datasets, cube) data = ( data.map(multiplex_user_input, cube=cube) @@ -167,6 +190,7 @@ def extend_cube_from_bag_internal( partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on) existing_datasets = discover_datasets(cube, store) + cube = ensure_valid_cube_indices(existing_datasets, cube) if overwrite: existing_datasets_cut = { ktk_cube_dataset_id: ds @@ -337,6 +361,7 @@ def append_to_cube_from_bag_internal( metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids) existing_datasets = discover_datasets(cube, store) + cube = ensure_valid_cube_indices(existing_datasets, cube) # existing_payload is set to empty because we're not checking against any existing payload. ktk will account for the # compat check within 1 dataset existing_payload = set() diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 8f82dbeb..bb794dc6 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -49,6 +49,11 @@ from ._utils import _maybe_get_categoricals_from_index from .delayed import read_table_as_delayed +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + @default_docs @normalize_args @@ -321,7 +326,7 @@ def _write_dataframe_partitions( store: StoreFactory, dataset_uuid: str, table: str, - secondary_indices: List[str], + secondary_indices: Union[Literal[False], List[str]], shuffle: bool, repartition_ratio: Optional[SupportsFloat], num_buckets: int, @@ -415,7 +420,8 @@ def update_dataset_from_ddf( ds_factory=factory, ) - _ensure_compatible_indices(ds_factory, secondary_indices) + inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + del secondary_indices if ds_factory is not None: check_single_table_dataset(ds_factory, table) @@ -425,7 +431,7 @@ def update_dataset_from_ddf( store=store, dataset_uuid=dataset_uuid or ds_factory.dataset_uuid, table=table, - secondary_indices=secondary_indices, + secondary_indices=inferred_indices, shuffle=shuffle, repartition_ratio=repartition_ratio, num_buckets=num_buckets, diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index 53b63a00..f7e49365 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -749,12 +749,13 @@ def update_dataset_from_dataframes( partition_on=partition_on, ) - secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices) + del secondary_indices mp = parse_input_to_metapartition( df_list, metadata_version=metadata_version, - expected_secondary_indices=secondary_indices, + expected_secondary_indices=inferred_indices, ) if sort_partitions_by: @@ -763,8 +764,8 @@ def update_dataset_from_dataframes( if partition_on: mp = mp.partition_on(partition_on) - if secondary_indices: - mp = mp.build_indices(secondary_indices) + if inferred_indices: + mp = mp.build_indices(inferred_indices) mp = mp.store_dataframes( store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer diff --git a/kartothek/io/testing/update.py b/kartothek/io/testing/update.py index a18b0e8d..943aecf2 100644 --- a/kartothek/io/testing/update.py +++ b/kartothek/io/testing/update.py @@ -585,6 +585,8 @@ def test_raises_on_invalid_input(store_factory, bound_update_dataset): def test_raises_on_new_index_creation( backend_identifier, store_factory, bound_update_dataset, define_indices_on_partition ): + # This test can be removed once the variable index input is removed in + # favour of the test `test_update_secondary_indices_subset` if backend_identifier == "dask.dataframe" and define_indices_on_partition: pytest.skip() # Constructs a dataframe which ignores index information passed as dict @@ -626,6 +628,31 @@ def test_raises_on_new_index_creation( ) +def test_update_secondary_indices_subset(store_factory, bound_update_dataset): + df1 = pd.DataFrame({"A": range(10), "indexed": 1}) + dataset_uuid = "dataset_uuid" + bound_update_dataset( + df1, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="indexed" + ) + + df2 = pd.DataFrame({"A": range(10), "indexed": 2}) + # secondary index is omitted. Kartothek should pick it up regardless + bound_update_dataset(df2, dataset_uuid=dataset_uuid, store=store_factory) + + dm = DatasetMetadata.load_from_store( + dataset_uuid, store_factory(), load_all_indices=True + ) + obs_values = dm.indices["indexed"].observed_values() + + assert sorted(obs_values) == [1, 2] + + with pytest.raises(ValueError, match="Incorrect indices provided"): + # secondary index is omitted. Kartothek should pick it up regardless + bound_update_dataset( + df2, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="A" + ) + + @pytest.mark.parametrize("define_indices_on_partition", (False, True)) def test_update_first_time_with_secondary_indices( store_factory, bound_update_dataset, define_indices_on_partition diff --git a/kartothek/io/testing/update_cube.py b/kartothek/io/testing/update_cube.py index f16945a5..450fe1e9 100644 --- a/kartothek/io/testing/update_cube.py +++ b/kartothek/io/testing/update_cube.py @@ -158,3 +158,95 @@ def test_update_respects_ktk_cube_dataset_ids( assert set(df_ex_read["p"]) == {1} else: assert set(df_ex_read["p"]) == {0, 1} + + +def test_cube_update_secondary_indices_subset(function_store, driver): + + cube1 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["indexed"], + ) + df_1 = pd.DataFrame({"A": range(10), "P": 1, "indexed": 1, "not-indexed": 1}) + build_cube( + data={"source": df_1}, + cube=cube1, + store=function_store, + metadata={"source": {"meta_at_create": "data"}}, + ) + + cube2 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + ) + df_2 = pd.DataFrame({"A": range(10, 20), "P": 1, "indexed": 2, "not-indexed": 1}) + driver( + data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None + ) + + dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset) + dm = DatasetMetadata.load_from_store( + dataset_uuid, function_store(), load_all_indices=True + ) + obs_values = dm.indices["indexed"].observed_values() + + assert sorted(obs_values) == [1, 2] + + cube2 = Cube( + dimension_columns=["A"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["not-indexed"], + ) + with pytest.raises( + ValueError, + match='ExplicitSecondaryIndex or PartitionIndex "not-indexed" is missing in dataset', + ): + driver( + data={"source": df_2}, + cube=cube2, + store=function_store, + remove_conditions=None, + ) + + +def test_cube_blacklist_dimension_index(function_store, driver): + + cube1 = Cube( + dimension_columns=["A", "B"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + ) + df_1 = pd.DataFrame({"A": range(10), "P": 1, "B": 1, "payload": ""}) + build_cube( + data={"source": df_1}, + cube=cube1, + store=function_store, + metadata={"source": {"meta_at_create": "data"}}, + ) + + cube2 = Cube( + dimension_columns=["A", "B"], + partition_columns=["P"], + uuid_prefix="cube", + seed_dataset="source", + suppress_index_on=["B"], + ) + df_2 = pd.DataFrame({"A": range(10), "P": 1, "B": 2, "payload": ""}) + driver( + data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None + ) + + dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset) + dm = DatasetMetadata.load_from_store( + dataset_uuid, function_store(), load_all_indices=True + ) + obs_values = dm.indices["B"].observed_values() + + assert sorted(obs_values) == [1, 2] diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index b394ae4f..372cc79e 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -42,6 +42,12 @@ filter_df_from_predicates, ) +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + LOGGER = logging.getLogger(__name__) SINGLE_TABLE = "table" @@ -1647,7 +1653,9 @@ def partition_labels_from_mps(mps): def parse_input_to_metapartition( - obj, metadata_version=None, expected_secondary_indices=False + obj: Optional[Union[Dict, pd.DataFrame, Sequence, MetaPartition]], + metadata_version: Optional[int] = None, + expected_secondary_indices: Optional[Union[Literal[False], Sequence[str]]] = False, ) -> MetaPartition: """ Parses given user input and returns a MetaPartition diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index c6c8218f..75f94fe3 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -4,13 +4,13 @@ import collections import inspect import logging -from typing import List, Optional, TypeVar, Union, overload +from typing import List, Optional, Sequence, TypeVar, Union, overload import decorator import pandas as pd from kartothek.core.dataset import DatasetMetadata -from kartothek.core.factory import _ensure_factory +from kartothek.core.factory import DatasetFactory, _ensure_factory from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store, lazy_store @@ -110,11 +110,16 @@ def _combine_metadata(dataset_metadata, append_to_list): return InvalidObject() -def _ensure_compatible_indices(dataset, secondary_indices): +def _ensure_compatible_indices( + dataset: Optional[Union[DatasetMetadata, DatasetFactory]], + secondary_indices: Optional[Sequence[str]], +) -> Union[Literal[False], List[str]]: if dataset: ds_secondary_indices = list(dataset.secondary_indices.keys()) - if secondary_indices and set(ds_secondary_indices) != set(secondary_indices): + if secondary_indices and not set(secondary_indices).issubset( + ds_secondary_indices + ): raise ValueError( f"Incorrect indices provided for dataset.\n" f"Expected: {ds_secondary_indices}\n" @@ -125,7 +130,10 @@ def _ensure_compatible_indices(dataset, secondary_indices): # We return `False` if there is no dataset in storage and `secondary_indices` is undefined # (`secondary_indices` is normalized to `[]` by default). # In consequence, `parse_input_to_metapartition` will not check indices at the partition level. - return secondary_indices or False + if secondary_indices: + return list(secondary_indices) + else: + return False def _ensure_valid_indices(mp_indices, secondary_indices=None, data=None): diff --git a/kartothek/io_components/write.py b/kartothek/io_components/write.py index 50dc155d..23258503 100644 --- a/kartothek/io_components/write.py +++ b/kartothek/io_components/write.py @@ -29,12 +29,18 @@ ) from kartothek.serialization import DataFrameSerializer +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + SINGLE_CATEGORY = SINGLE_TABLE def write_partition( partition_df: Any, # TODO: Establish typing for parse_input_to_metapartition - secondary_indices: Optional[Union[str, Sequence[str]]], + secondary_indices: Optional[Union[Literal[False], Sequence[str]]], sort_partitions_by: Optional[Union[str, Sequence[str]]], dataset_uuid: str, partition_on: Optional[Union[str, Sequence[str]]], From 85224ebdc02204543d030241f5ba084cb3dc9e66 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 3 Feb 2021 16:01:18 +0100 Subject: [PATCH 2/8] Factor out inferred indices type --- kartothek/io/dask/_shuffle.py | 46 +++++++++++------------- kartothek/io/dask/common_cube.py | 21 ++++++++--- kartothek/io/dask/dataframe.py | 8 ++--- kartothek/io_components/metapartition.py | 17 +++++---- kartothek/io_components/utils.py | 13 +++---- kartothek/io_components/write.py | 15 ++++---- 6 files changed, 61 insertions(+), 59 deletions(-) diff --git a/kartothek/io/dask/_shuffle.py b/kartothek/io/dask/_shuffle.py index c639efd6..afa8cef3 100644 --- a/kartothek/io/dask/_shuffle.py +++ b/kartothek/io/dask/_shuffle.py @@ -1,5 +1,5 @@ from functools import partial -from typing import List, Optional, Sequence, Union +from typing import List, Optional, Sequence, cast import dask.array as da import dask.dataframe as dd @@ -9,15 +9,10 @@ from kartothek.core.typing import StoreFactory from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas from kartothek.io_components.metapartition import MetaPartition +from kartothek.io_components.utils import INFERRED_INDICES from kartothek.io_components.write import write_partition from kartothek.serialization import DataFrameSerializer -try: - from typing_extensions import Literal # type: ignore -except ImportError: - from typing import Literal # type: ignore - - _KTK_HASH_BUCKET = "__KTK_HASH_BUCKET" @@ -41,7 +36,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: def shuffle_store_dask_partitions( ddf: dd.DataFrame, table: str, - secondary_indices: Optional[Union[Literal[False], Sequence[str]]], + secondary_indices: Optional[INFERRED_INDICES], metadata_version: int, partition_on: List[str], store_factory: StoreFactory, @@ -115,28 +110,29 @@ def shuffle_store_dask_partitions( unpacked_meta = ddf._meta ddf = pack_payload(ddf, group_key=group_cols) - ddf = ddf.groupby(by=group_cols) - ddf = ddf.apply( - partial( - _unpack_store_partition, - secondary_indices=secondary_indices, - sort_partitions_by=sort_partitions_by, - table=table, - dataset_uuid=dataset_uuid, - partition_on=partition_on, - store_factory=store_factory, - df_serializer=df_serializer, - metadata_version=metadata_version, - unpacked_meta=unpacked_meta, - ), - meta=("MetaPartition", "object"), + ddf_grouped = ddf.groupby(by=group_cols) + + unpack = partial( + _unpack_store_partition, + secondary_indices=secondary_indices, + sort_partitions_by=sort_partitions_by, + table=table, + dataset_uuid=dataset_uuid, + partition_on=partition_on, + store_factory=store_factory, + df_serializer=df_serializer, + metadata_version=metadata_version, + unpacked_meta=unpacked_meta, + ) + return cast( + da.Array, # Output type depends on meta but mypy cannot infer this easily. + ddf_grouped.apply(unpack, meta=("MetaPartition", "object")), ) - return ddf def _unpack_store_partition( df: pd.DataFrame, - secondary_indices: List[str], + secondary_indices: Optional[INFERRED_INDICES], sort_partitions_by: List[str], table: str, dataset_uuid: str, diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index b83a4af2..987589cd 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -3,6 +3,7 @@ """ from collections import defaultdict from functools import partial +from typing import Dict import dask.bag as db @@ -13,6 +14,8 @@ KTK_CUBE_METADATA_STORAGE_FORMAT, KTK_CUBE_METADATA_VERSION, ) +from kartothek.core.cube.cube import Cube +from kartothek.core.dataset import DatasetMetadataBase from kartothek.io_components.cube.append import check_existing_datasets from kartothek.io_components.cube.common import check_blocksize, check_store_factory from kartothek.io_components.cube.query import load_group, plan_query, quick_concat @@ -50,7 +53,17 @@ ) -def ensure_valid_cube_indices(existing_datasets, cube): +def ensure_valid_cube_indices( + existing_datasets: Dict[str, DatasetMetadataBase], cube: Cube +) -> Cube: + """ + Parse all existing datasets and infer the required set of indices. We do not + allow indices to be removed or added in update steps at the momenent and + need to make sure that existing ones are updated properly. + The returned `Cube` instance will be a copy of the input with + `index_columns` and `suppress_index_on` fields adjusted to reflect the + existing datasets. + """ required_indices = set(cube.index_columns) suppress_index_on = set(cube.suppress_index_on) for ds in existing_datasets.values(): @@ -58,9 +71,9 @@ def ensure_valid_cube_indices(existing_datasets, cube): table_indices = required_indices & dataset_columns compatible_indices = _ensure_compatible_indices(ds, table_indices) if compatible_indices: - compatible_indices = set(compatible_indices) - suppress_index_on -= compatible_indices - required_indices |= compatible_indices + required_indices = set(compatible_indices) + suppress_index_on -= required_indices + required_indices |= required_indices # Need to remove dimension columns since they *are* technically indices but # the cube interface class declares them as not indexed just to add them # later on, assuming it is not blacklisted diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index bb794dc6..3dce0901 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -32,6 +32,7 @@ from kartothek.io_components.read import dispatch_metapartitions_from_factory from kartothek.io_components.update import update_dataset_from_partitions from kartothek.io_components.utils import ( + INFERRED_INDICES, _ensure_compatible_indices, check_single_table_dataset, normalize_arg, @@ -49,11 +50,6 @@ from ._utils import _maybe_get_categoricals_from_index from .delayed import read_table_as_delayed -try: - from typing_extensions import Literal # type: ignore -except ImportError: - from typing import Literal # type: ignore - @default_docs @normalize_args @@ -326,7 +322,7 @@ def _write_dataframe_partitions( store: StoreFactory, dataset_uuid: str, table: str, - secondary_indices: Union[Literal[False], List[str]], + secondary_indices: Optional[INFERRED_INDICES], shuffle: bool, repartition_ratio: Optional[SupportsFloat], num_buckets: int, diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index 372cc79e..6033d788 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -34,7 +34,11 @@ verify_metadata_version, ) from kartothek.core.uuid import gen_uuid -from kartothek.io_components.utils import _ensure_valid_indices, combine_metadata +from kartothek.io_components.utils import ( + INFERRED_INDICES, + _ensure_valid_indices, + combine_metadata, +) from kartothek.serialization import ( DataFrameSerializer, PredicatesType, @@ -42,12 +46,6 @@ filter_df_from_predicates, ) -try: - from typing_extensions import Literal # type: ignore -except ImportError: - from typing import Literal # type: ignore - - LOGGER = logging.getLogger(__name__) SINGLE_TABLE = "table" @@ -67,6 +65,7 @@ } _MULTI_TABLE_DICT_LIST = Dict[str, Iterable[str]] +METAPARTITION_INPUT_TYPE = Union[Dict, pd.DataFrame, Sequence, "MetaPartition"] def _predicates_to_named(predicates): @@ -1653,9 +1652,9 @@ def partition_labels_from_mps(mps): def parse_input_to_metapartition( - obj: Optional[Union[Dict, pd.DataFrame, Sequence, MetaPartition]], + obj: METAPARTITION_INPUT_TYPE, metadata_version: Optional[int] = None, - expected_secondary_indices: Optional[Union[Literal[False], Sequence[str]]] = False, + expected_secondary_indices: Optional[INFERRED_INDICES] = False, ) -> MetaPartition: """ Parses given user input and returns a MetaPartition diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index 75f94fe3..dd3162c6 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -4,13 +4,13 @@ import collections import inspect import logging -from typing import List, Optional, Sequence, TypeVar, Union, overload +from typing import Iterable, List, Optional, TypeVar, Union, overload import decorator import pandas as pd -from kartothek.core.dataset import DatasetMetadata -from kartothek.core.factory import DatasetFactory, _ensure_factory +from kartothek.core.dataset import DatasetMetadata, DatasetMetadataBase +from kartothek.core.factory import _ensure_factory from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store, lazy_store @@ -19,6 +19,8 @@ except ImportError: from typing import Literal # type: ignore +# Literal false is sentinel, see function body of `_ensure_compatible_indices` for details +INFERRED_INDICES = Union[Literal[False], List[str]] signature = inspect.signature @@ -111,9 +113,8 @@ def _combine_metadata(dataset_metadata, append_to_list): def _ensure_compatible_indices( - dataset: Optional[Union[DatasetMetadata, DatasetFactory]], - secondary_indices: Optional[Sequence[str]], -) -> Union[Literal[False], List[str]]: + dataset: Optional[DatasetMetadataBase], secondary_indices: Optional[Iterable[str]], +) -> INFERRED_INDICES: if dataset: ds_secondary_indices = list(dataset.secondary_indices.keys()) diff --git a/kartothek/io_components/write.py b/kartothek/io_components/write.py index 23258503..dae99122 100644 --- a/kartothek/io_components/write.py +++ b/kartothek/io_components/write.py @@ -1,6 +1,6 @@ from collections import defaultdict from functools import partial -from typing import Any, Dict, Optional, Sequence, Union, cast +from typing import Dict, Optional, Sequence, Union, cast import pandas as pd @@ -17,30 +17,26 @@ from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store from kartothek.io_components.metapartition import ( + METAPARTITION_INPUT_TYPE, SINGLE_TABLE, MetaPartition, parse_input_to_metapartition, partition_labels_from_mps, ) from kartothek.io_components.utils import ( + INFERRED_INDICES, combine_metadata, extract_duplicates, sort_values_categorical, ) from kartothek.serialization import DataFrameSerializer -try: - from typing_extensions import Literal # type: ignore -except ImportError: - from typing import Literal # type: ignore - - SINGLE_CATEGORY = SINGLE_TABLE def write_partition( - partition_df: Any, # TODO: Establish typing for parse_input_to_metapartition - secondary_indices: Optional[Union[Literal[False], Sequence[str]]], + partition_df: METAPARTITION_INPUT_TYPE, + secondary_indices: Optional[INFERRED_INDICES], sort_partitions_by: Optional[Union[str, Sequence[str]]], dataset_uuid: str, partition_on: Optional[Union[str, Sequence[str]]], @@ -54,6 +50,7 @@ def write_partition( like partitioning, bucketing (NotImplemented), indexing, etc. in the correct order. """ store = ensure_store(store_factory) + parse_input: METAPARTITION_INPUT_TYPE if isinstance(partition_df, pd.DataFrame) and dataset_table_name: parse_input = [{"data": {dataset_table_name: partition_df}}] else: From 29d4255c12688cb787194e3aa2a84f394432f44f Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 3 Feb 2021 17:06:12 +0100 Subject: [PATCH 3/8] Patch test_update --- kartothek/io_components/metapartition.py | 4 +++- tests/io/dask/dataframe/test_update.py | 8 ++++++-- tests/io/dask/delayed/test_update.py | 2 ++ tests/io/iter/test_update.py | 3 +++ 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index 6033d788..16f71dc1 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -1800,6 +1800,8 @@ def parse_input_to_metapartition( elif isinstance(obj, MetaPartition): return obj else: - raise ValueError("Unexpected type: {}".format(type(obj))) + raise ValueError( + f"Unexpected type during parsing encountered: ({type(obj)}, {obj})" + ) return mp diff --git a/tests/io/dask/dataframe/test_update.py b/tests/io/dask/dataframe/test_update.py index 474db25f..4d884ad6 100644 --- a/tests/io/dask/dataframe/test_update.py +++ b/tests/io/dask/dataframe/test_update.py @@ -2,6 +2,7 @@ import dask import dask.dataframe as dd +import pandas as pd import pytest from kartothek.io.dask.dataframe import update_dataset_from_ddf @@ -18,8 +19,11 @@ def _unwrap_partition(part): def _update_dataset(partitions, *args, **kwargs): - # TODO: fix the parsing below to adapt for all supported formats (see: parse_input_to_metapartition) - if any(partitions): + # TODO: Simplify once parse_input_to_metapartition is removed / obsolete + if isinstance(partitions, pd.DataFrame): + table_name = "core" + partitions = dd.from_pandas(partitions, npartitions=1) + elif any(partitions): table_name = next(iter(dict(partitions[0]["data"]).keys())) delayed_partitions = [ dask.delayed(_unwrap_partition)(part) for part in partitions diff --git a/tests/io/dask/delayed/test_update.py b/tests/io/dask/delayed/test_update.py index c2ce4bb3..ba4ba7c9 100644 --- a/tests/io/dask/delayed/test_update.py +++ b/tests/io/dask/delayed/test_update.py @@ -18,6 +18,8 @@ def _unwrap_partition(part): def _update_dataset(partitions, *args, **kwargs): + if not isinstance(partitions, list): + partitions = [partitions] tasks = update_dataset_from_delayed(partitions, *args, **kwargs) s = pickle.dumps(tasks, pickle.HIGHEST_PROTOCOL) diff --git a/tests/io/iter/test_update.py b/tests/io/iter/test_update.py index 609143b7..0f0e56ad 100644 --- a/tests/io/iter/test_update.py +++ b/tests/io/iter/test_update.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import pandas as pd import pytest from kartothek.io.iter import update_dataset_from_dataframes__iter @@ -12,5 +13,7 @@ def bound_update_dataset(): def _update_dataset(df_list, *args, **kwargs): + if isinstance(df_list, pd.DataFrame): + df_list = [df_list] df_generator = (x for x in df_list) return update_dataset_from_dataframes__iter(df_generator, *args, **kwargs) From 6a4f0ad254dbb4176a634b61ed4ad503556c238e Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 4 Feb 2021 09:17:37 +0100 Subject: [PATCH 4/8] use camelcase for defined types --- kartothek/io/dask/_shuffle.py | 6 +++--- kartothek/io/dask/dataframe.py | 4 ++-- kartothek/io_components/metapartition.py | 8 ++++---- kartothek/io_components/utils.py | 4 ++-- kartothek/io_components/write.py | 10 +++++----- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/kartothek/io/dask/_shuffle.py b/kartothek/io/dask/_shuffle.py index afa8cef3..78d025c2 100644 --- a/kartothek/io/dask/_shuffle.py +++ b/kartothek/io/dask/_shuffle.py @@ -9,7 +9,7 @@ from kartothek.core.typing import StoreFactory from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas from kartothek.io_components.metapartition import MetaPartition -from kartothek.io_components.utils import INFERRED_INDICES +from kartothek.io_components.utils import InferredIndices from kartothek.io_components.write import write_partition from kartothek.serialization import DataFrameSerializer @@ -36,7 +36,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: def shuffle_store_dask_partitions( ddf: dd.DataFrame, table: str, - secondary_indices: Optional[INFERRED_INDICES], + secondary_indices: Optional[InferredIndices], metadata_version: int, partition_on: List[str], store_factory: StoreFactory, @@ -132,7 +132,7 @@ def shuffle_store_dask_partitions( def _unpack_store_partition( df: pd.DataFrame, - secondary_indices: Optional[INFERRED_INDICES], + secondary_indices: Optional[InferredIndices], sort_partitions_by: List[str], table: str, dataset_uuid: str, diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 3dce0901..d6f24966 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -32,7 +32,7 @@ from kartothek.io_components.read import dispatch_metapartitions_from_factory from kartothek.io_components.update import update_dataset_from_partitions from kartothek.io_components.utils import ( - INFERRED_INDICES, + InferredIndices, _ensure_compatible_indices, check_single_table_dataset, normalize_arg, @@ -322,7 +322,7 @@ def _write_dataframe_partitions( store: StoreFactory, dataset_uuid: str, table: str, - secondary_indices: Optional[INFERRED_INDICES], + secondary_indices: Optional[InferredIndices], shuffle: bool, repartition_ratio: Optional[SupportsFloat], num_buckets: int, diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index 16f71dc1..ea3819ac 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -35,7 +35,7 @@ ) from kartothek.core.uuid import gen_uuid from kartothek.io_components.utils import ( - INFERRED_INDICES, + InferredIndices, _ensure_valid_indices, combine_metadata, ) @@ -65,7 +65,7 @@ } _MULTI_TABLE_DICT_LIST = Dict[str, Iterable[str]] -METAPARTITION_INPUT_TYPE = Union[Dict, pd.DataFrame, Sequence, "MetaPartition"] +MetaPartitionInput = Union[Dict, pd.DataFrame, Sequence, "MetaPartition"] def _predicates_to_named(predicates): @@ -1652,9 +1652,9 @@ def partition_labels_from_mps(mps): def parse_input_to_metapartition( - obj: METAPARTITION_INPUT_TYPE, + obj: MetaPartitionInput, metadata_version: Optional[int] = None, - expected_secondary_indices: Optional[INFERRED_INDICES] = False, + expected_secondary_indices: Optional[InferredIndices] = False, ) -> MetaPartition: """ Parses given user input and returns a MetaPartition diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index dd3162c6..12cd207a 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -20,7 +20,7 @@ from typing import Literal # type: ignore # Literal false is sentinel, see function body of `_ensure_compatible_indices` for details -INFERRED_INDICES = Union[Literal[False], List[str]] +InferredIndices = Union[Literal[False], List[str]] signature = inspect.signature @@ -114,7 +114,7 @@ def _combine_metadata(dataset_metadata, append_to_list): def _ensure_compatible_indices( dataset: Optional[DatasetMetadataBase], secondary_indices: Optional[Iterable[str]], -) -> INFERRED_INDICES: +) -> InferredIndices: if dataset: ds_secondary_indices = list(dataset.secondary_indices.keys()) diff --git a/kartothek/io_components/write.py b/kartothek/io_components/write.py index dae99122..8aebdc0d 100644 --- a/kartothek/io_components/write.py +++ b/kartothek/io_components/write.py @@ -17,14 +17,14 @@ from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store from kartothek.io_components.metapartition import ( - METAPARTITION_INPUT_TYPE, SINGLE_TABLE, MetaPartition, + MetaPartitionInput, parse_input_to_metapartition, partition_labels_from_mps, ) from kartothek.io_components.utils import ( - INFERRED_INDICES, + InferredIndices, combine_metadata, extract_duplicates, sort_values_categorical, @@ -35,8 +35,8 @@ def write_partition( - partition_df: METAPARTITION_INPUT_TYPE, - secondary_indices: Optional[INFERRED_INDICES], + partition_df: MetaPartitionInput, + secondary_indices: Optional[InferredIndices], sort_partitions_by: Optional[Union[str, Sequence[str]]], dataset_uuid: str, partition_on: Optional[Union[str, Sequence[str]]], @@ -50,7 +50,7 @@ def write_partition( like partitioning, bucketing (NotImplemented), indexing, etc. in the correct order. """ store = ensure_store(store_factory) - parse_input: METAPARTITION_INPUT_TYPE + parse_input: MetaPartitionInput if isinstance(partition_df, pd.DataFrame) and dataset_table_name: parse_input = [{"data": {dataset_table_name: partition_df}}] else: From 92253b3f9ef27dd1bf18918895f860d0b62aa7a3 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 4 Feb 2021 10:38:14 +0100 Subject: [PATCH 5/8] Fix bug introduced when improving typing --- kartothek/io/dask/common_cube.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 987589cd..0c2a9f20 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -71,9 +71,9 @@ def ensure_valid_cube_indices( table_indices = required_indices & dataset_columns compatible_indices = _ensure_compatible_indices(ds, table_indices) if compatible_indices: - required_indices = set(compatible_indices) - suppress_index_on -= required_indices - required_indices |= required_indices + dataset_indices = set(compatible_indices) + suppress_index_on -= dataset_indices + required_indices |= dataset_indices # Need to remove dimension columns since they *are* technically indices but # the cube interface class declares them as not indexed just to add them # later on, assuming it is not blacklisted From ef41e610199062157c295d376d11312a44baf020 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 4 Feb 2021 16:01:06 +0100 Subject: [PATCH 6/8] Handle the case where cube tries to extend a multi tabled dataset --- kartothek/io/dask/common_cube.py | 16 ++++++++-------- kartothek/io/testing/build_cube.py | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 0c2a9f20..121a62c1 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -33,7 +33,6 @@ prepare_ktk_partition_on, ) from kartothek.io_components.metapartition import ( - SINGLE_TABLE, MetaPartition, parse_input_to_metapartition, ) @@ -67,13 +66,14 @@ def ensure_valid_cube_indices( required_indices = set(cube.index_columns) suppress_index_on = set(cube.suppress_index_on) for ds in existing_datasets.values(): - dataset_columns = set(ds.table_meta[SINGLE_TABLE].names) - table_indices = required_indices & dataset_columns - compatible_indices = _ensure_compatible_indices(ds, table_indices) - if compatible_indices: - dataset_indices = set(compatible_indices) - suppress_index_on -= dataset_indices - required_indices |= dataset_indices + for internal_table in ds.table_meta: + dataset_columns = set(ds.table_meta[internal_table].names) + table_indices = required_indices & dataset_columns + compatible_indices = _ensure_compatible_indices(ds, table_indices) + if compatible_indices: + dataset_indices = set(compatible_indices) + suppress_index_on -= dataset_indices + required_indices |= dataset_indices # Need to remove dimension columns since they *are* technically indices but # the cube interface class declares them as not indexed just to add them # later on, assuming it is not blacklisted diff --git a/kartothek/io/testing/build_cube.py b/kartothek/io/testing/build_cube.py index 92b48d7c..b178deb3 100644 --- a/kartothek/io/testing/build_cube.py +++ b/kartothek/io/testing/build_cube.py @@ -1119,6 +1119,7 @@ def test_overwrite_rollback_ktk(driver, function_store): store=function_store, dataset_uuid=cube.ktk_dataset_uuid(cube.seed_dataset), metadata_version=KTK_CUBE_METADATA_VERSION, + secondary_indices=["i1", "i2"], ) df_source2 = pd.DataFrame( From 4cbd840affd24dd6d86f20dad77b2aeaa753b94b Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 8 Feb 2021 10:53:31 +0100 Subject: [PATCH 7/8] Add doc section about mutating indexed datasets --- docs/guide/mutating_datasets.rst | 47 +++++++++++++++++++++++++++++++- kartothek/io/dask/bag_cube.py | 4 +++ kartothek/io/dask/dataframe.py | 4 +++ kartothek/io/dask/delayed.py | 4 +++ kartothek/io/eager.py | 4 +++ kartothek/io/iter.py | 4 +++ 6 files changed, 66 insertions(+), 1 deletion(-) diff --git a/docs/guide/mutating_datasets.rst b/docs/guide/mutating_datasets.rst index 515882aa..8e5612b8 100644 --- a/docs/guide/mutating_datasets.rst +++ b/docs/guide/mutating_datasets.rst @@ -1,4 +1,4 @@ - +.. _mutating_datasets: Mutating Datasets ================= @@ -286,6 +286,8 @@ consists of two rows corresponding to ``B=2013-01-02`` (from ``df``) and four ro Thus, the original partition with the two rows corresponding to ``B=2013-01-03`` from ``df`` has been completely replaced. + + Garbage collection ------------------ @@ -324,3 +326,46 @@ When garbage collection is called, the files are removed. files_before.difference(store.keys()) # Show files removed .. _storefact: https://github.com/blue-yonder/storefact + + +Mutating indexed datasets +------------------------- + +If the to-be-updated dataset was created with an index, every update on this dataset will update the index automatically and ensure the dataset will never end up in an inconsistent state. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example + +.. ipython:: python + + df = pd.DataFrame({"payload": range(10), "i1": 0, "i2": ["a"] * 5 + ["b"] * 5}) + dm = store_dataframes_as_dataset( + store_url, "indexed_dataset", [df], secondary_indices=["i1", "i2"] + ) + dm = dm.load_all_indices(store_url) + dm.indices["i1"].observed_values() + dm.indices["i2"].observed_values() + + new_df = pd.DataFrame({"payload": range(10), "i1": 1, "i2": "c"}) + +If we do not specify anything, kartothek will infer the indices and update them correctly + +.. ipython:: python + + dm = update_dataset_from_dataframes([new_df], store=store_url, dataset_uuid=dm.uuid) + + dm = dm.load_all_indices(store_url) + dm.indices["i1"].observed_values() + dm.indices["i2"].observed_values() + + +This is even true if only a subset is given + +.. ipython:: python + + new_df = pd.DataFrame({"payload": range(10), "i1": 2, "i2": "d"}) + dm = update_dataset_from_dataframes( + [new_df], store=store_url, dataset_uuid=dm.uuid, secondary_indices="i1" + ) + + dm = dm.load_all_indices(store_url) + dm.indices["i1"].observed_values() + dm.indices["i2"].observed_values() + diff --git a/kartothek/io/dask/bag_cube.py b/kartothek/io/dask/bag_cube.py index 15dd177e..9e9a2c0d 100644 --- a/kartothek/io/dask/bag_cube.py +++ b/kartothek/io/dask/bag_cube.py @@ -442,6 +442,10 @@ def update_cube_from_bag( metadata_dict: dask.bag.Bag A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element. + + See Also + -------- + :ref:`mutating_datasets` """ return append_to_cube_from_bag_internal( data=data, diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index d6f24966..465e7c77 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -398,6 +398,10 @@ def update_dataset_from_ddf( ): """ Update a dataset from a dask.dataframe. + + See Also + -------- + :ref:`mutating_datasets` """ partition_on = normalize_arg("partition_on", partition_on) secondary_indices = normalize_arg("secondary_indices", secondary_indices) diff --git a/kartothek/io/dask/delayed.py b/kartothek/io/dask/delayed.py index a3b26d63..92f86c42 100644 --- a/kartothek/io/dask/delayed.py +++ b/kartothek/io/dask/delayed.py @@ -464,6 +464,10 @@ def update_dataset_from_delayed( Parameters ---------- + + See Also + -------- + :ref:`mutating_datasets` """ partition_on = normalize_arg("partition_on", partition_on) store = normalize_arg("store", store) diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index f7e49365..805792c5 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -728,6 +728,10 @@ def update_dataset_from_dataframes( Returns ------- The dataset metadata object (:class:`~kartothek.core.dataset.DatasetMetadata`). + + See Also + -------- + :ref:`mutating_datasets` """ if load_dynamic_metadata is not True: warnings.warn( diff --git a/kartothek/io/iter.py b/kartothek/io/iter.py index 79d08c5a..7072f140 100644 --- a/kartothek/io/iter.py +++ b/kartothek/io/iter.py @@ -220,6 +220,10 @@ def update_dataset_from_dataframes__iter( Returns ------- The dataset metadata object (:class:`~kartothek.core.dataset.DatasetMetadata`). + + See Also + -------- + :ref:`mutating_datasets` """ if load_dynamic_metadata is not True: warnings.warn( From aa50ab37fa2df90e67ea4837ba7b505b7adbccf0 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Mon, 8 Feb 2021 11:36:21 +0100 Subject: [PATCH 8/8] Update docs/guide/mutating_datasets.rst Co-authored-by: jochen-ott-by --- docs/guide/mutating_datasets.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/guide/mutating_datasets.rst b/docs/guide/mutating_datasets.rst index 8e5612b8..4698506f 100644 --- a/docs/guide/mutating_datasets.rst +++ b/docs/guide/mutating_datasets.rst @@ -331,7 +331,7 @@ When garbage collection is called, the files are removed. Mutating indexed datasets ------------------------- -If the to-be-updated dataset was created with an index, every update on this dataset will update the index automatically and ensure the dataset will never end up in an inconsistent state. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example +The mutating operation will update all indices that currently exist for the dataset. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example .. ipython:: python @@ -368,4 +368,3 @@ This is even true if only a subset is given dm = dm.load_all_indices(store_url) dm.indices["i1"].observed_values() dm.indices["i2"].observed_values() -