Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure updates on neither cube nor plain datasets corrupt indices #398

Merged
merged 8 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Changelog
=========

Version 3.18.1 (2021-02-XY)
===========================

* Fix an issue where updates on cubes or updates on datatsets using
dask.dataframe might not update all secondary indices, resulting in a corrupt
state after the update

Version 3.18.0 (2021-01-25)
===========================
Expand Down
46 changes: 45 additions & 1 deletion docs/guide/mutating_datasets.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

.. _mutating_datasets:

Mutating Datasets
=================
Expand Down Expand Up @@ -286,6 +286,8 @@ consists of two rows corresponding to ``B=2013-01-02`` (from ``df``) and four ro
Thus, the original partition with the two rows corresponding to ``B=2013-01-03`` from ``df``
has been completely replaced.



Garbage collection
------------------

Expand Down Expand Up @@ -324,3 +326,45 @@ When garbage collection is called, the files are removed.
files_before.difference(store.keys()) # Show files removed

.. _storefact: https://github.com/blue-yonder/storefact


Mutating indexed datasets
-------------------------

The mutating operation will update all indices that currently exist for the dataset. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example

.. ipython:: python

df = pd.DataFrame({"payload": range(10), "i1": 0, "i2": ["a"] * 5 + ["b"] * 5})
dm = store_dataframes_as_dataset(
store_url, "indexed_dataset", [df], secondary_indices=["i1", "i2"]
)
dm = dm.load_all_indices(store_url)
dm.indices["i1"].observed_values()
dm.indices["i2"].observed_values()

new_df = pd.DataFrame({"payload": range(10), "i1": 1, "i2": "c"})

If we do not specify anything, kartothek will infer the indices and update them correctly

.. ipython:: python

dm = update_dataset_from_dataframes([new_df], store=store_url, dataset_uuid=dm.uuid)

dm = dm.load_all_indices(store_url)
dm.indices["i1"].observed_values()
dm.indices["i2"].observed_values()


This is even true if only a subset is given

.. ipython:: python

new_df = pd.DataFrame({"payload": range(10), "i1": 2, "i2": "d"})
dm = update_dataset_from_dataframes(
[new_df], store=store_url, dataset_uuid=dm.uuid, secondary_indices="i1"
)

dm = dm.load_all_indices(store_url)
dm.indices["i1"].observed_values()
dm.indices["i2"].observed_values()
2 changes: 1 addition & 1 deletion kartothek/api/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _check_indices(datasets: Dict[str, DatasetMetadata], cube: Cube) -> None:
For all datasets the primary indices must be equal to ``ds.partition_keys``. For the seed dataset, secondary
indices for all dimension columns except ``cube.suppress_index_on`` are expected.

Additional indices are accepted and will not bew reported as error.
Additional indices are accepted and will not be reported as error.

Parameters
----------
Expand Down
40 changes: 21 additions & 19 deletions kartothek/io/dask/_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import partial
from typing import List, Optional, Sequence, Union
from typing import List, Optional, Sequence, cast

import dask.array as da
import dask.dataframe as dd
Expand All @@ -9,6 +9,7 @@
from kartothek.core.typing import StoreFactory
from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas
from kartothek.io_components.metapartition import MetaPartition
from kartothek.io_components.utils import InferredIndices
from kartothek.io_components.write import write_partition
from kartothek.serialization import DataFrameSerializer

Expand All @@ -35,7 +36,7 @@ def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets:
def shuffle_store_dask_partitions(
ddf: dd.DataFrame,
table: str,
secondary_indices: Optional[Union[str, Sequence[str]]],
secondary_indices: Optional[InferredIndices],
metadata_version: int,
partition_on: List[str],
store_factory: StoreFactory,
Expand Down Expand Up @@ -109,28 +110,29 @@ def shuffle_store_dask_partitions(
unpacked_meta = ddf._meta

ddf = pack_payload(ddf, group_key=group_cols)
ddf = ddf.groupby(by=group_cols)
ddf = ddf.apply(
partial(
_unpack_store_partition,
secondary_indices=secondary_indices,
sort_partitions_by=sort_partitions_by,
table=table,
dataset_uuid=dataset_uuid,
partition_on=partition_on,
store_factory=store_factory,
df_serializer=df_serializer,
metadata_version=metadata_version,
unpacked_meta=unpacked_meta,
),
meta=("MetaPartition", "object"),
ddf_grouped = ddf.groupby(by=group_cols)

unpack = partial(
_unpack_store_partition,
secondary_indices=secondary_indices,
sort_partitions_by=sort_partitions_by,
table=table,
dataset_uuid=dataset_uuid,
partition_on=partition_on,
store_factory=store_factory,
df_serializer=df_serializer,
metadata_version=metadata_version,
unpacked_meta=unpacked_meta,
)
return cast(
da.Array, # Output type depends on meta but mypy cannot infer this easily.
ddf_grouped.apply(unpack, meta=("MetaPartition", "object")),
)
return ddf


def _unpack_store_partition(
df: pd.DataFrame,
secondary_indices: List[str],
secondary_indices: Optional[InferredIndices],
sort_partitions_by: List[str],
table: str,
dataset_uuid: str,
Expand Down
4 changes: 4 additions & 0 deletions kartothek/io/dask/bag_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ def update_cube_from_bag(
metadata_dict: dask.bag.Bag
A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata
objects. The bag has a single partition with a single element.

See Also
--------
:ref:`mutating_datasets`
"""
return append_to_cube_from_bag_internal(
data=data,
Expand Down
38 changes: 38 additions & 0 deletions kartothek/io/dask/common_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from collections import defaultdict
from functools import partial
from typing import Dict

import dask.bag as db

Expand All @@ -13,6 +14,8 @@
KTK_CUBE_METADATA_STORAGE_FORMAT,
KTK_CUBE_METADATA_VERSION,
)
from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadataBase
from kartothek.io_components.cube.append import check_existing_datasets
from kartothek.io_components.cube.common import check_blocksize, check_store_factory
from kartothek.io_components.cube.query import load_group, plan_query, quick_concat
Expand All @@ -34,6 +37,7 @@
parse_input_to_metapartition,
)
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import _ensure_compatible_indices
from kartothek.io_components.write import (
raise_if_dataset_exists,
store_dataset_from_partitions,
Expand All @@ -48,6 +52,37 @@
)


def ensure_valid_cube_indices(
existing_datasets: Dict[str, DatasetMetadataBase], cube: Cube
) -> Cube:
"""
Parse all existing datasets and infer the required set of indices. We do not
allow indices to be removed or added in update steps at the momenent and
need to make sure that existing ones are updated properly.
The returned `Cube` instance will be a copy of the input with
`index_columns` and `suppress_index_on` fields adjusted to reflect the
existing datasets.
"""
required_indices = set(cube.index_columns)
suppress_index_on = set(cube.suppress_index_on)
for ds in existing_datasets.values():
for internal_table in ds.table_meta:
dataset_columns = set(ds.table_meta[internal_table].names)
table_indices = required_indices & dataset_columns
compatible_indices = _ensure_compatible_indices(ds, table_indices)
if compatible_indices:
dataset_indices = set(compatible_indices)
suppress_index_on -= dataset_indices
required_indices |= dataset_indices
# Need to remove dimension columns since they *are* technically indices but
# the cube interface class declares them as not indexed just to add them
# later on, assuming it is not blacklisted
return cube.copy(
index_columns=required_indices - set(cube.dimension_columns),
suppress_index_on=suppress_index_on,
)


def build_cube_from_bag_internal(
data, cube, store, ktk_cube_dataset_ids, metadata, overwrite, partition_on
):
Expand Down Expand Up @@ -90,6 +125,7 @@ def build_cube_from_bag_internal(
existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store)
check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets)
partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on)
cube = ensure_valid_cube_indices(existing_datasets, cube)

data = (
data.map(multiplex_user_input, cube=cube)
Expand Down Expand Up @@ -167,6 +203,7 @@ def extend_cube_from_bag_internal(
partition_on = prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on)

existing_datasets = discover_datasets(cube, store)
cube = ensure_valid_cube_indices(existing_datasets, cube)
if overwrite:
existing_datasets_cut = {
ktk_cube_dataset_id: ds
Expand Down Expand Up @@ -337,6 +374,7 @@ def append_to_cube_from_bag_internal(
metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids)

existing_datasets = discover_datasets(cube, store)
cube = ensure_valid_cube_indices(existing_datasets, cube)
# existing_payload is set to empty because we're not checking against any existing payload. ktk will account for the
# compat check within 1 dataset
existing_payload = set()
Expand Down
12 changes: 9 additions & 3 deletions kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from kartothek.io_components.read import dispatch_metapartitions_from_factory
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
InferredIndices,
_ensure_compatible_indices,
check_single_table_dataset,
normalize_arg,
Expand Down Expand Up @@ -321,7 +322,7 @@ def _write_dataframe_partitions(
store: StoreFactory,
dataset_uuid: str,
table: str,
secondary_indices: List[str],
secondary_indices: Optional[InferredIndices],
shuffle: bool,
repartition_ratio: Optional[SupportsFloat],
num_buckets: int,
Expand Down Expand Up @@ -397,6 +398,10 @@ def update_dataset_from_ddf(
):
"""
Update a dataset from a dask.dataframe.

See Also
--------
:ref:`mutating_datasets`
"""
partition_on = normalize_arg("partition_on", partition_on)
secondary_indices = normalize_arg("secondary_indices", secondary_indices)
Expand All @@ -415,7 +420,8 @@ def update_dataset_from_ddf(
ds_factory=factory,
)

_ensure_compatible_indices(ds_factory, secondary_indices)
inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
del secondary_indices

if ds_factory is not None:
check_single_table_dataset(ds_factory, table)
Expand All @@ -425,7 +431,7 @@ def update_dataset_from_ddf(
store=store,
dataset_uuid=dataset_uuid or ds_factory.dataset_uuid,
table=table,
secondary_indices=secondary_indices,
secondary_indices=inferred_indices,
shuffle=shuffle,
repartition_ratio=repartition_ratio,
num_buckets=num_buckets,
Expand Down
4 changes: 4 additions & 0 deletions kartothek/io/dask/delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ def update_dataset_from_delayed(

Parameters
----------

See Also
--------
:ref:`mutating_datasets`
"""
partition_on = normalize_arg("partition_on", partition_on)
store = normalize_arg("store", store)
Expand Down
13 changes: 9 additions & 4 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,10 @@ def update_dataset_from_dataframes(
Returns
-------
The dataset metadata object (:class:`~kartothek.core.dataset.DatasetMetadata`).

See Also
--------
:ref:`mutating_datasets`
"""
if load_dynamic_metadata is not True:
warnings.warn(
Expand All @@ -749,12 +753,13 @@ def update_dataset_from_dataframes(
partition_on=partition_on,
)

secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
del secondary_indices

mp = parse_input_to_metapartition(
df_list,
metadata_version=metadata_version,
expected_secondary_indices=secondary_indices,
expected_secondary_indices=inferred_indices,
)

if sort_partitions_by:
Expand All @@ -763,8 +768,8 @@ def update_dataset_from_dataframes(
if partition_on:
mp = mp.partition_on(partition_on)

if secondary_indices:
mp = mp.build_indices(secondary_indices)
if inferred_indices:
mp = mp.build_indices(inferred_indices)

mp = mp.store_dataframes(
store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
Expand Down
4 changes: 4 additions & 0 deletions kartothek/io/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def update_dataset_from_dataframes__iter(
Returns
-------
The dataset metadata object (:class:`~kartothek.core.dataset.DatasetMetadata`).

See Also
--------
:ref:`mutating_datasets`
"""
if load_dynamic_metadata is not True:
warnings.warn(
Expand Down
1 change: 1 addition & 0 deletions kartothek/io/testing/build_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,7 @@ def test_overwrite_rollback_ktk(driver, function_store):
store=function_store,
dataset_uuid=cube.ktk_dataset_uuid(cube.seed_dataset),
metadata_version=KTK_CUBE_METADATA_VERSION,
secondary_indices=["i1", "i2"],
)

df_source2 = pd.DataFrame(
Expand Down
Loading