Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated kwargs #434

Merged
merged 1 commit into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ This is a major release of kartothek with breaking API changes.
* Trying to read a multi-tabled dataset will now cause an exception telling users that this is no longer supported with kartothek 4.0
* The dict schema for :meth:`~kartothek.core.dataset.DatasetMetadataBase.to_dict` and :meth:`~kartothek.core.dataset.DatasetMetadata.from_dict` changed replacing a dictionary in `table_meta` with the simple `schema`
* All pipeline arguments which previously accepted a dictionary of sequences to describe a table specific subset of columns now accept plain sequences (e.g. `columns`, `categoricals`)

* Remove the following list of deprecated arguments for io pipelines
* label_filter
* central_partition_metadata
* load_dynamic_metadata
* load_dataset_metadata
* concat_partitions_on_primary_index
* Remove `output_dataset_uuid` and `df_serializer` from :func:`kartothek.io.eager.commit_dataset` since these arguments didn't have any effect
* Remove `metadata`, `df_serializer`, `overwrite`, `metadata_merger` from :func:`kartothek.io.eager.write_single_partition`
* :func:`~kartothek.io.eager.store_dataframes_as_dataset` now requires a list as an input

Version 3.20.0 (2021-03-15)
===========================
Expand Down
14 changes: 0 additions & 14 deletions kartothek/core/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@
"categoricals": """
categoricals
Load the provided subset of columns as a :class:`pandas.Categorical`.""",
"label_filter": """
label_filter: Callable
A callable taking a partition label as a parameter and returns a boolean. The callable will be applied
to the list of partitions during dispatch and will filter out all partitions for which the callable
evaluates to False.""",
"dates_as_object": """
dates_as_object: bool
Load pyarrow.date{32,64} columns as ``object`` columns in Pandas
Expand Down Expand Up @@ -167,18 +162,9 @@
"df_generator": """
df_generator: Iterable[Union[pandas.DataFrame, Dict[str, pandas.DataFrame]]]
The dataframe(s) to be stored""",
"central_partition_metadata": """
central_partition_metadata: bool
This has no use and will be removed in future releases""",
"default_metadata_version": """
default_metadata_version: int
Default metadata version. (Note: Metadata version greater than 3 are only supported)""",
"load_dynamic_metadata": """
load_dynamic_metadata: bool
The keyword `load_dynamic_metadata` is deprecated and will be removed in the next major release.""",
"concat_partitions_on_primary_index": """
concat_partitions_on_primary_index: bool
Concatenate partition based on their primary index values.""",
"delayed_tasks": """
delayed_tasks
A list of delayed objects where each element returns a :class:`pandas.DataFrame`.""",
Expand Down
8 changes: 0 additions & 8 deletions kartothek/core/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def __init__(
store_factory: StoreInput,
load_schema: bool = True,
load_all_indices: bool = False,
load_dataset_metadata: bool = True,
) -> None:
"""
A dataset factory object which can be used to cache dataset load operations. This class should be the primary user entry point when
Expand Down Expand Up @@ -59,8 +58,6 @@ def __init__(
Load the schema information immediately.
load_all_indices
Load all indices immediately.
load_dataset_metadata
Keep the user metadata in memory
"""
self._cache_metadata: Optional[DatasetMetadata] = None
self._cache_store = None
Expand All @@ -70,7 +67,6 @@ def __init__(
self.load_schema = load_schema
self._ds_callable = None
self.is_loaded = False
self.load_dataset_metadata = load_dataset_metadata
self.load_all_indices_flag = load_all_indices

def __repr__(self):
Expand All @@ -96,8 +92,6 @@ def _instantiate_metadata_cache(self: T) -> T:
load_schema=self.load_schema,
load_all_indices=self.load_all_indices_flag,
)
if not self.load_dataset_metadata:
self._cache_metadata.metadata = {}
self.is_loaded = True
return self

Expand Down Expand Up @@ -161,7 +155,6 @@ def _ensure_factory(
dataset_uuid: Optional[str],
store: Optional[StoreInput],
factory: Optional[DatasetFactory],
load_dataset_metadata: bool,
load_schema: bool = True,
) -> DatasetFactory:

Expand All @@ -171,7 +164,6 @@ def _ensure_factory(
return DatasetFactory(
dataset_uuid=dataset_uuid,
store_factory=lazy_store(store),
load_dataset_metadata=load_dataset_metadata,
load_schema=load_schema,
)

Expand Down
40 changes: 4 additions & 36 deletions kartothek/io/dask/bag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import warnings
from functools import partial
from typing import Optional, Sequence

Expand Down Expand Up @@ -54,17 +52,13 @@ def read_dataset_as_metapartitions_bag(
dataset_uuid=None,
store=None,
columns=None,
concat_partitions_on_primary_index=False,
predicate_pushdown_to_io=True,
categoricals=None,
label_filter=None,
dates_as_object=False,
load_dataset_metadata=False,
predicates=None,
factory=None,
dispatch_by=None,
partition_size=None,
dispatch_metadata=True,
):
"""
Retrieve dataset as `dask.bag.Bag` of `MetaPartition` objects.
Expand All @@ -78,33 +72,16 @@ def read_dataset_as_metapartitions_bag(
A dask.bag object containing the metapartions.
"""
ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=load_dataset_metadata,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

if len(ds_factory.tables) > 1:
warnings.warn(
"Trying to read a dataset with multiple internal tables. This functionality will be removed in the next "
"major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube "
"functionality. "
"https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html",
DeprecationWarning,
)

store = ds_factory.store_factory
mps = dispatch_metapartitions_from_factory(
dataset_factory=ds_factory,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
label_filter=label_filter,
predicates=predicates,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
dataset_factory=ds_factory, predicates=predicates, dispatch_by=dispatch_by,
)
mps = db.from_sequence(mps, partition_size=partition_size)

if concat_partitions_on_primary_index or dispatch_by is not None:
if dispatch_by is not None:
mps = mps.map(
_load_and_concat_metapartitions_inner,
store=store,
Expand Down Expand Up @@ -146,10 +123,8 @@ def read_dataset_as_dataframe_bag(
dataset_uuid=None,
store=None,
columns=None,
concat_partitions_on_primary_index=False,
predicate_pushdown_to_io=True,
categoricals=None,
label_filter=None,
dates_as_object=False,
predicates=None,
factory=None,
Expand All @@ -172,16 +147,12 @@ def read_dataset_as_dataframe_bag(
store=store,
factory=factory,
columns=columns,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categoricals=categoricals,
label_filter=label_filter,
dates_as_object=dates_as_object,
load_dataset_metadata=False,
predicates=predicates,
dispatch_by=dispatch_by,
partition_size=partition_size,
dispatch_metadata=False,
)
return mps.map(_get_data)

Expand Down Expand Up @@ -276,10 +247,7 @@ def build_dataset_indices__bag(

"""
ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

assert ds_factory.schema is not None
Expand Down
33 changes: 4 additions & 29 deletions kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import random
import warnings
from typing import (
Callable,
Iterable,
Expand Down Expand Up @@ -65,10 +64,8 @@ def read_dataset_as_ddf(
store=None,
table=SINGLE_TABLE,
columns=None,
concat_partitions_on_primary_index=False,
predicate_pushdown_to_io=True,
categoricals: Optional[Sequence[str]] = None,
label_filter=None,
dates_as_object=False,
predicates=None,
factory=None,
Expand Down Expand Up @@ -102,21 +99,9 @@ def read_dataset_as_ddf(
)

ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

if len(ds_factory.tables) > 1:
warnings.warn(
"Trying to read a dataset with multiple internal tables. This functionality will be removed in the next "
"major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube "
"functionality. "
"https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html",
DeprecationWarning,
)

if isinstance(columns, dict):
columns = columns[table]
meta = _get_dask_meta_for_dataset(
Expand All @@ -130,10 +115,8 @@ def read_dataset_as_ddf(
delayed_partitions = read_dataset_as_delayed(
factory=ds_factory,
columns=columns,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categoricals=categoricals,
label_filter=label_filter,
dates_as_object=dates_as_object,
predicates=predicates,
dispatch_by=dask_index_on if dask_index_on else dispatch_by,
Expand Down Expand Up @@ -291,9 +274,7 @@ def store_dataset_from_ddf(
if table is None:
raise TypeError("The parameter `table` is not optional.")

ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid, store=store, factory=None, load_dataset_metadata=True
)
ds_factory = _ensure_factory(dataset_uuid=dataset_uuid, store=store, factory=None)

if not overwrite:
raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store)
Expand Down Expand Up @@ -514,10 +495,7 @@ def collect_dataset_metadata(
"Please make sure to provide a value larger than 0.0 and smaller than or equal to 1.0 ."
)
dataset_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

mps = list(
Expand Down Expand Up @@ -593,10 +571,7 @@ def hash_dataset(
If provided, calculate hash per group instead of per partition
"""
dataset_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

columns = subset
Expand Down
46 changes: 5 additions & 41 deletions kartothek/io/dask/delayed.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import warnings
from functools import partial
from typing import List, Optional, Sequence

Expand Down Expand Up @@ -77,11 +75,7 @@ def delete_dataset__delayed(dataset_uuid=None, store=None, factory=None):
----------
"""
dataset_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_schema=False,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory, load_schema=False,
)

gc = garbage_collect_dataset__delayed(factory=dataset_factory)
Expand Down Expand Up @@ -124,10 +118,7 @@ def garbage_collect_dataset__delayed(
"""

ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

nested_files = dispatch_files_to_gc(
Expand All @@ -150,23 +141,18 @@ def _load_and_concat_metapartitions(list_of_mps, *args, **kwargs):
)


# FIXME: remove
@default_docs
@normalize_args
def read_dataset_as_delayed_metapartitions(
dataset_uuid=None,
store=None,
columns=None,
concat_partitions_on_primary_index=False,
predicate_pushdown_to_io=True,
categoricals: Optional[Sequence[str]] = None,
label_filter=None,
dates_as_object=False,
load_dataset_metadata=False,
predicates=None,
factory=None,
dispatch_by=None,
dispatch_metadata=True,
):
"""
A collection of dask.delayed objects to retrieve a dataset from store where each
Expand All @@ -181,32 +167,15 @@ def read_dataset_as_delayed_metapartitions(

"""
ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=load_dataset_metadata,
dataset_uuid=dataset_uuid, store=store, factory=factory,
)

if len(ds_factory.tables) > 1:
warnings.warn(
"Trying to read a dataset with multiple internal tables. This functionality will be removed in the next "
"major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube "
"functionality. "
"https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html",
DeprecationWarning,
)

store = ds_factory.store_factory
mps = dispatch_metapartitions_from_factory(
dataset_factory=ds_factory,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
label_filter=label_filter,
predicates=predicates,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
dataset_factory=ds_factory, predicates=predicates, dispatch_by=dispatch_by,
)

if concat_partitions_on_primary_index or dispatch_by is not None:
if dispatch_by is not None:
mps = _load_and_concat_metapartitions(
mps,
store=store,
Expand Down Expand Up @@ -253,10 +222,8 @@ def read_dataset_as_delayed(
dataset_uuid=None,
store=None,
columns=None,
concat_partitions_on_primary_index=False,
predicate_pushdown_to_io=True,
categoricals=None,
label_filter=None,
dates_as_object=False,
predicates=None,
factory=None,
Expand All @@ -274,12 +241,9 @@ def read_dataset_as_delayed(
store=store,
factory=factory,
columns=columns,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categoricals=categoricals,
label_filter=label_filter,
dates_as_object=dates_as_object,
load_dataset_metadata=False,
predicates=predicates,
dispatch_by=dispatch_by,
)
Expand Down
Loading