Skip to content

Commit

Permalink
Ensure updates on neither cube nor plain datasets corrupt indices
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Feb 2, 2021
1 parent 5f48231 commit 316d176
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Changelog
=========

Version 3.18.1 (2021-02-XY)
===========================

* Fix an issue where updates on cubes or updates on datatsets using
dask.dataframe might not update all secondary indices, resulting in a corrupt
state after the update

Version 3.18.0 (2021-01-25)
===========================
Expand Down
2 changes: 1 addition & 1 deletion kartothek/api/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _check_indices(datasets: Dict[str, DatasetMetadata], cube: Cube) -> None:
For all datasets the primary indices must be equal to ``ds.partition_keys``. For the seed dataset, secondary
indices for all dimension columns except ``cube.suppress_index_on`` are expected.
Additional indices are accepted and will not bew reported as error.
Additional indices are accepted and will not be reported as error.
Parameters
----------
Expand Down
8 changes: 7 additions & 1 deletion kartothek/io/dask/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from kartothek.io_components.write import write_partition
from kartothek.serialization import DataFrameSerializer

try:
from typing_extensions import Literal # type: ignore
except ImportError:
from typing import Literal # type: ignore


_KTK_HASH_BUCKET = "__KTK_HASH_BUCKET"


Expand All @@ -35,7 +41,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets:
def shuffle_store_dask_partitions(
ddf: dd.DataFrame,
table: str,
secondary_indices: Optional[Union[str, Sequence[str]]],
secondary_indices: Optional[Union[Literal[False], Sequence[str]]],
metadata_version: int,
partition_on: List[str],
store_factory: StoreFactory,
Expand Down
25 changes: 25 additions & 0 deletions kartothek/io/dask/common_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
prepare_ktk_partition_on,
)
from kartothek.io_components.metapartition import (
SINGLE_TABLE,
MetaPartition,
parse_input_to_metapartition,
)
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import _ensure_compatible_indices
from kartothek.io_components.write import (
raise_if_dataset_exists,
store_dataset_from_partitions,
Expand All @@ -48,6 +50,26 @@
)


def ensure_valid_cube_indices(existing_datasets, cube):
required_indices = set(cube.index_columns)
suppress_index_on = set(cube.suppress_index_on)
for ds in existing_datasets.values():
dataset_columns = set(ds.table_meta[SINGLE_TABLE].names)
table_indices = required_indices & dataset_columns
compatible_indices = _ensure_compatible_indices(ds, table_indices)
if compatible_indices:
compatible_indices = set(compatible_indices)
suppress_index_on -= compatible_indices
required_indices |= compatible_indices
# Need to remove dimension columns since they *are* technically indices but
# the cube interface class declares them as not indexed just to add them
# later on, assuming it is not blacklisted
return cube.copy(
index_columns=required_indices - set(cube.dimension_columns),
suppress_index_on=suppress_index_on,
)


def build_cube_from_bag_internal(
data, cube, store, ktk_cube_dataset_ids, metadata, overwrite, partition_on
):
Expand Down Expand Up @@ -90,6 +112,7 @@ def build_cube_from_bag_internal(
existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store)
check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets)
partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on)
cube = ensure_valid_cube_indices(existing_datasets, cube)

data = (
data.map(multiplex_user_input, cube=cube)
Expand Down Expand Up @@ -167,6 +190,7 @@ def extend_cube_from_bag_internal(
partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on)

existing_datasets = discover_datasets(cube, store)
cube = ensure_valid_cube_indices(existing_datasets, cube)
if overwrite:
existing_datasets_cut = {
ktk_cube_dataset_id: ds
Expand Down Expand Up @@ -337,6 +361,7 @@ def append_to_cube_from_bag_internal(
metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids)

existing_datasets = discover_datasets(cube, store)
cube = ensure_valid_cube_indices(existing_datasets, cube)
# existing_payload is set to empty because we're not checking against any existing payload. ktk will account for the
# compat check within 1 dataset
existing_payload = set()
Expand Down
12 changes: 9 additions & 3 deletions kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
from ._utils import _maybe_get_categoricals_from_index
from .delayed import read_table_as_delayed

try:
from typing_extensions import Literal # type: ignore
except ImportError:
from typing import Literal # type: ignore


@default_docs
@normalize_args
Expand Down Expand Up @@ -321,7 +326,7 @@ def _write_dataframe_partitions(
store: StoreFactory,
dataset_uuid: str,
table: str,
secondary_indices: List[str],
secondary_indices: Union[Literal[False], List[str]],
shuffle: bool,
repartition_ratio: Optional[SupportsFloat],
num_buckets: int,
Expand Down Expand Up @@ -415,7 +420,8 @@ def update_dataset_from_ddf(
ds_factory=factory,
)

_ensure_compatible_indices(ds_factory, secondary_indices)
inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
del secondary_indices

if ds_factory is not None:
check_single_table_dataset(ds_factory, table)
Expand All @@ -425,7 +431,7 @@ def update_dataset_from_ddf(
store=store,
dataset_uuid=dataset_uuid or ds_factory.dataset_uuid,
table=table,
secondary_indices=secondary_indices,
secondary_indices=inferred_indices,
shuffle=shuffle,
repartition_ratio=repartition_ratio,
num_buckets=num_buckets,
Expand Down
9 changes: 5 additions & 4 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,12 +749,13 @@ def update_dataset_from_dataframes(
partition_on=partition_on,
)

secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
del secondary_indices

mp = parse_input_to_metapartition(
df_list,
metadata_version=metadata_version,
expected_secondary_indices=secondary_indices,
expected_secondary_indices=inferred_indices,
)

if sort_partitions_by:
Expand All @@ -763,8 +764,8 @@ def update_dataset_from_dataframes(
if partition_on:
mp = mp.partition_on(partition_on)

if secondary_indices:
mp = mp.build_indices(secondary_indices)
if inferred_indices:
mp = mp.build_indices(inferred_indices)

mp = mp.store_dataframes(
store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
Expand Down
27 changes: 27 additions & 0 deletions kartothek/io/testing/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ def test_raises_on_invalid_input(store_factory, bound_update_dataset):
def test_raises_on_new_index_creation(
backend_identifier, store_factory, bound_update_dataset, define_indices_on_partition
):
# This test can be removed once the variable index input is removed in
# favour of the test `test_update_secondary_indices_subset`
if backend_identifier == "dask.dataframe" and define_indices_on_partition:
pytest.skip() # Constructs a dataframe which ignores index information passed as dict

Expand Down Expand Up @@ -626,6 +628,31 @@ def test_raises_on_new_index_creation(
)


def test_update_secondary_indices_subset(store_factory, bound_update_dataset):
df1 = pd.DataFrame({"A": range(10), "indexed": 1})
dataset_uuid = "dataset_uuid"
bound_update_dataset(
df1, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="indexed"
)

df2 = pd.DataFrame({"A": range(10), "indexed": 2})
# secondary index is omitted. Kartothek should pick it up regardless
bound_update_dataset(df2, dataset_uuid=dataset_uuid, store=store_factory)

dm = DatasetMetadata.load_from_store(
dataset_uuid, store_factory(), load_all_indices=True
)
obs_values = dm.indices["indexed"].observed_values()

assert sorted(obs_values) == [1, 2]

with pytest.raises(ValueError, match="Incorrect indices provided"):
# secondary index is omitted. Kartothek should pick it up regardless
bound_update_dataset(
df2, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="A"
)


@pytest.mark.parametrize("define_indices_on_partition", (False, True))
def test_update_first_time_with_secondary_indices(
store_factory, bound_update_dataset, define_indices_on_partition
Expand Down
92 changes: 92 additions & 0 deletions kartothek/io/testing/update_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,95 @@ def test_update_respects_ktk_cube_dataset_ids(
assert set(df_ex_read["p"]) == {1}
else:
assert set(df_ex_read["p"]) == {0, 1}


def test_cube_update_secondary_indices_subset(function_store, driver):

cube1 = Cube(
dimension_columns=["A"],
partition_columns=["P"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["indexed"],
)
df_1 = pd.DataFrame({"A": range(10), "P": 1, "indexed": 1, "not-indexed": 1})
build_cube(
data={"source": df_1},
cube=cube1,
store=function_store,
metadata={"source": {"meta_at_create": "data"}},
)

cube2 = Cube(
dimension_columns=["A"],
partition_columns=["P"],
uuid_prefix="cube",
seed_dataset="source",
)
df_2 = pd.DataFrame({"A": range(10, 20), "P": 1, "indexed": 2, "not-indexed": 1})
driver(
data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None
)

dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset)
dm = DatasetMetadata.load_from_store(
dataset_uuid, function_store(), load_all_indices=True
)
obs_values = dm.indices["indexed"].observed_values()

assert sorted(obs_values) == [1, 2]

cube2 = Cube(
dimension_columns=["A"],
partition_columns=["P"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["not-indexed"],
)
with pytest.raises(
ValueError,
match='ExplicitSecondaryIndex or PartitionIndex "not-indexed" is missing in dataset',
):
driver(
data={"source": df_2},
cube=cube2,
store=function_store,
remove_conditions=None,
)


def test_cube_blacklist_dimension_index(function_store, driver):

cube1 = Cube(
dimension_columns=["A", "B"],
partition_columns=["P"],
uuid_prefix="cube",
seed_dataset="source",
)
df_1 = pd.DataFrame({"A": range(10), "P": 1, "B": 1, "payload": ""})
build_cube(
data={"source": df_1},
cube=cube1,
store=function_store,
metadata={"source": {"meta_at_create": "data"}},
)

cube2 = Cube(
dimension_columns=["A", "B"],
partition_columns=["P"],
uuid_prefix="cube",
seed_dataset="source",
suppress_index_on=["B"],
)
df_2 = pd.DataFrame({"A": range(10), "P": 1, "B": 2, "payload": ""})
driver(
data={"source": df_2}, cube=cube2, store=function_store, remove_conditions=None
)

dataset_uuid = cube2.ktk_dataset_uuid(cube2.seed_dataset)
dm = DatasetMetadata.load_from_store(
dataset_uuid, function_store(), load_all_indices=True
)
obs_values = dm.indices["B"].observed_values()

assert sorted(obs_values) == [1, 2]
10 changes: 9 additions & 1 deletion kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
filter_df_from_predicates,
)

try:
from typing_extensions import Literal # type: ignore
except ImportError:
from typing import Literal # type: ignore


LOGGER = logging.getLogger(__name__)

SINGLE_TABLE = "table"
Expand Down Expand Up @@ -1647,7 +1653,9 @@ def partition_labels_from_mps(mps):


def parse_input_to_metapartition(
obj, metadata_version=None, expected_secondary_indices=False
obj: Optional[Union[Dict, pd.DataFrame, Sequence, MetaPartition]],
metadata_version: Optional[int] = None,
expected_secondary_indices: Optional[Union[Literal[False], Sequence[str]]] = False,
) -> MetaPartition:
"""
Parses given user input and returns a MetaPartition
Expand Down
18 changes: 13 additions & 5 deletions kartothek/io_components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import collections
import inspect
import logging
from typing import List, Optional, TypeVar, Union, overload
from typing import List, Optional, Sequence, TypeVar, Union, overload

import decorator
import pandas as pd

from kartothek.core.dataset import DatasetMetadata
from kartothek.core.factory import _ensure_factory
from kartothek.core.factory import DatasetFactory, _ensure_factory
from kartothek.core.typing import StoreFactory, StoreInput
from kartothek.core.utils import ensure_store, lazy_store

Expand Down Expand Up @@ -110,11 +110,16 @@ def _combine_metadata(dataset_metadata, append_to_list):
return InvalidObject()


def _ensure_compatible_indices(dataset, secondary_indices):
def _ensure_compatible_indices(
dataset: Optional[Union[DatasetMetadata, DatasetFactory]],
secondary_indices: Optional[Sequence[str]],
) -> Union[Literal[False], List[str]]:
if dataset:
ds_secondary_indices = list(dataset.secondary_indices.keys())

if secondary_indices and set(ds_secondary_indices) != set(secondary_indices):
if secondary_indices and not set(secondary_indices).issubset(
ds_secondary_indices
):
raise ValueError(
f"Incorrect indices provided for dataset.\n"
f"Expected: {ds_secondary_indices}\n"
Expand All @@ -125,7 +130,10 @@ def _ensure_compatible_indices(dataset, secondary_indices):
# We return `False` if there is no dataset in storage and `secondary_indices` is undefined
# (`secondary_indices` is normalized to `[]` by default).
# In consequence, `parse_input_to_metapartition` will not check indices at the partition level.
return secondary_indices or False
if secondary_indices:
return list(secondary_indices)
else:
return False


def _ensure_valid_indices(mp_indices, secondary_indices=None, data=None):
Expand Down
Loading

0 comments on commit 316d176

Please sign in to comment.