Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update_dataset_from_ddf corrupts datasets if table names diverge on create -> update #452

Open
stephan-hesselmann-by opened this issue Apr 12, 2021 · 0 comments

Comments

@stephan-hesselmann-by
Copy link
Collaborator

This is a follow up issue to #445 and #451. Currently datasets can still be corrupted if the table name diverges between the creation and update step.

Snippet:

import dask
import pandas as pd
import dask.dataframe as dd
from datetime import date
from storefact import get_store_from_url
from kartothek.io.dask.dataframe import update_dataset_from_ddf, read_dataset_as_ddf

dask.config.set(scheduler="synchronous")
store_url = f"hfs://testdata"
store = get_store_from_url(store_url)
dataset_uuid = "testdata"


def create():
    df = pd.DataFrame(
        {"date": [date(2021, 1, x) for x in range(1, 6)], "value": range(5)}
    )
    ddf = dask.dataframe.from_pandas(df, npartitions=1)
    delayed = update_dataset_from_ddf(
        ddf, store, dataset_uuid, table="predictions", partition_on=["date"]
    )
    res = delayed.compute()


def update():
    df = pd.DataFrame(
        {"date": [date(2021, 1, x) for x in range(6, 11)], "value": range(5)}
    )
    ddf = dask.dataframe.from_pandas(df, npartitions=1)
    delayed = update_dataset_from_ddf(ddf, store, dataset_uuid, partition_on=["date"])
    res = delayed.compute()


def validate():
    ddf = read_dataset_as_ddf(dataset_uuid, store, "predictions")
    df = ddf.compute()
    print(df)


if __name__ == "__main__":
    create()
    update()
    validate()

The Kartothek<4 behavior of this code is to raise a TypeError:

TypeError: Unexpected table in dataset:
Found:	['predictions']
Expected:	table

Files:

├──  testdata
│  ├──  testdata
│  │  └──  predictions
│  │     ├──  _common_metadata
│  │     ├──  date=2021-01-01
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-02
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-03
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-04
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     └──  date=2021-01-05
│  │        └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  └──  testdata.by-dataset-metadata.json

The Kartothek>=4 behavior of this code is to corrupt the dataset:

Traceback (most recent call last):
  File "test_kartothek_create_update.py", line 46, in <module>
    validate()
  File "test_kartothek_create_update.py", line 35, in validate
    ddf = read_dataset_as_ddf(dataset_uuid, store, "predictions")
  File "<decorator-gen-7>", line 2, in read_dataset_as_ddf
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 277, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 275, in _wrapper
    return function(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/dataframe.py", line 113, in read_dataset_as_ddf
    delayed_partitions = read_dataset_as_delayed(
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/delayed.py", line 239, in read_dataset_as_delayed
    mps = read_dataset_as_delayed_metapartitions(
  File "<decorator-gen-5>", line 2, in read_dataset_as_delayed_metapartitions
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 277, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 275, in _wrapper
    return function(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/delayed.py", line 217, in read_dataset_as_delayed_metapartitions
    return list(mps)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/read.py", line 102, in dispatch_metapartitions_from_factory
    yield MetaPartition.from_partition(
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/metapartition.py", line 426, in from_partition
    file=partition.files[table_name],
KeyError: 'predictions'

Files:

├──  testdata
│  ├──  testdata
│  │  ├──  predictions
│  │  │  ├──  _common_metadata
│  │  │  ├──  date=2021-01-01
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-02
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-03
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-04
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  └──  date=2021-01-05
│  │  │     └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  └──  table
│  │     ├──  date=2021-01-06
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-07
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-08
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-09
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     └──  date=2021-01-10
│  │        └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  └──  testdata.by-dataset-metadata.json

Expected Behavior:
Based on these changelog entries for Kartothek 4.0 I would expect Kartothek to infer the correct table name if left out.

All read pipelines will now automatically infer the table to read such that it is no longer necessary to provide table or table_name as an input argument
All writing pipelines which previously supported a complex user input type now expose an argument table_name which can be used to continue usage of legacy datasets (i.e. datasets with an intrinsic, non-trivial table name). This usage is discouraged and we recommend users to migrate to a default table name (i.e. leave it None / table)

However replicating the Kartothek<4 behavior would also be acceptable to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant