diff --git a/CHANGES.rst b/CHANGES.rst index e74210b2..8dc011ca 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -18,7 +18,15 @@ This is a major release of kartothek with breaking API changes. * Trying to read a multi-tabled dataset will now cause an exception telling users that this is no longer supported with kartothek 4.0 * The dict schema for :meth:`~kartothek.core.dataset.DatasetMetadataBase.to_dict` and :meth:`~kartothek.core.dataset.DatasetMetadata.from_dict` changed replacing a dictionary in `table_meta` with the simple `schema` * All pipeline arguments which previously accepted a dictionary of sequences to describe a table specific subset of columns now accept plain sequences (e.g. `columns`, `categoricals`) - +* Remove the following list of deprecated arguments for io pipelines + * label_filter + * central_partition_metadata + * load_dynamic_metadata + * load_dataset_metadata + * concat_partitions_on_primary_index +* Remove `output_dataset_uuid` and `df_serializer` from :func:`kartothek.io.eager.commit_dataset` since these arguments didn't have any effect +* Remove `metadata`, `df_serializer`, `overwrite`, `metadata_merger` from :func:`kartothek.io.eager.write_single_partition` +* :func:`~kartothek.io.eager.store_dataframes_as_dataset` now requires a list as an input Version 3.20.0 (2021-03-15) =========================== diff --git a/kartothek/core/docs.py b/kartothek/core/docs.py index eda7c5d8..1c5c39b2 100644 --- a/kartothek/core/docs.py +++ b/kartothek/core/docs.py @@ -110,11 +110,6 @@ "categoricals": """ categoricals Load the provided subset of columns as a :class:`pandas.Categorical`.""", - "label_filter": """ - label_filter: Callable - A callable taking a partition label as a parameter and returns a boolean. The callable will be applied - to the list of partitions during dispatch and will filter out all partitions for which the callable - evaluates to False.""", "dates_as_object": """ dates_as_object: bool Load pyarrow.date{32,64} columns as ``object`` columns in Pandas @@ -167,18 +162,9 @@ "df_generator": """ df_generator: Iterable[Union[pandas.DataFrame, Dict[str, pandas.DataFrame]]] The dataframe(s) to be stored""", - "central_partition_metadata": """ - central_partition_metadata: bool - This has no use and will be removed in future releases""", "default_metadata_version": """ default_metadata_version: int Default metadata version. (Note: Metadata version greater than 3 are only supported)""", - "load_dynamic_metadata": """ - load_dynamic_metadata: bool - The keyword `load_dynamic_metadata` is deprecated and will be removed in the next major release.""", - "concat_partitions_on_primary_index": """ - concat_partitions_on_primary_index: bool - Concatenate partition based on their primary index values.""", "delayed_tasks": """ delayed_tasks A list of delayed objects where each element returns a :class:`pandas.DataFrame`.""", diff --git a/kartothek/core/factory.py b/kartothek/core/factory.py index 62c8d4d0..59752e8d 100644 --- a/kartothek/core/factory.py +++ b/kartothek/core/factory.py @@ -28,7 +28,6 @@ def __init__( store_factory: StoreInput, load_schema: bool = True, load_all_indices: bool = False, - load_dataset_metadata: bool = True, ) -> None: """ A dataset factory object which can be used to cache dataset load operations. This class should be the primary user entry point when @@ -59,8 +58,6 @@ def __init__( Load the schema information immediately. load_all_indices Load all indices immediately. - load_dataset_metadata - Keep the user metadata in memory """ self._cache_metadata: Optional[DatasetMetadata] = None self._cache_store = None @@ -70,7 +67,6 @@ def __init__( self.load_schema = load_schema self._ds_callable = None self.is_loaded = False - self.load_dataset_metadata = load_dataset_metadata self.load_all_indices_flag = load_all_indices def __repr__(self): @@ -96,8 +92,6 @@ def _instantiate_metadata_cache(self: T) -> T: load_schema=self.load_schema, load_all_indices=self.load_all_indices_flag, ) - if not self.load_dataset_metadata: - self._cache_metadata.metadata = {} self.is_loaded = True return self @@ -161,7 +155,6 @@ def _ensure_factory( dataset_uuid: Optional[str], store: Optional[StoreInput], factory: Optional[DatasetFactory], - load_dataset_metadata: bool, load_schema: bool = True, ) -> DatasetFactory: @@ -171,7 +164,6 @@ def _ensure_factory( return DatasetFactory( dataset_uuid=dataset_uuid, store_factory=lazy_store(store), - load_dataset_metadata=load_dataset_metadata, load_schema=load_schema, ) diff --git a/kartothek/io/dask/bag.py b/kartothek/io/dask/bag.py index 30a76bec..1d0641fc 100644 --- a/kartothek/io/dask/bag.py +++ b/kartothek/io/dask/bag.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -import warnings from functools import partial from typing import Optional, Sequence @@ -54,17 +52,13 @@ def read_dataset_as_metapartitions_bag( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, - load_dataset_metadata=False, predicates=None, factory=None, dispatch_by=None, partition_size=None, - dispatch_metadata=True, ): """ Retrieve dataset as `dask.bag.Bag` of `MetaPartition` objects. @@ -78,33 +72,16 @@ def read_dataset_as_metapartitions_bag( A dask.bag object containing the metapartions. """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=load_dataset_metadata, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) - if len(ds_factory.tables) > 1: - warnings.warn( - "Trying to read a dataset with multiple internal tables. This functionality will be removed in the next " - "major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube " - "functionality. " - "https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html", - DeprecationWarning, - ) - store = ds_factory.store_factory mps = dispatch_metapartitions_from_factory( - dataset_factory=ds_factory, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, - label_filter=label_filter, - predicates=predicates, - dispatch_by=dispatch_by, - dispatch_metadata=dispatch_metadata, + dataset_factory=ds_factory, predicates=predicates, dispatch_by=dispatch_by, ) mps = db.from_sequence(mps, partition_size=partition_size) - if concat_partitions_on_primary_index or dispatch_by is not None: + if dispatch_by is not None: mps = mps.map( _load_and_concat_metapartitions_inner, store=store, @@ -146,10 +123,8 @@ def read_dataset_as_dataframe_bag( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, predicates=None, factory=None, @@ -172,16 +147,12 @@ def read_dataset_as_dataframe_bag( store=store, factory=factory, columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, - load_dataset_metadata=False, predicates=predicates, dispatch_by=dispatch_by, partition_size=partition_size, - dispatch_metadata=False, ) return mps.map(_get_data) @@ -276,10 +247,7 @@ def build_dataset_indices__bag( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) assert ds_factory.schema is not None diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index c1188fe1..8cba11f4 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -1,5 +1,4 @@ import random -import warnings from typing import ( Callable, Iterable, @@ -65,10 +64,8 @@ def read_dataset_as_ddf( store=None, table=SINGLE_TABLE, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals: Optional[Sequence[str]] = None, - label_filter=None, dates_as_object=False, predicates=None, factory=None, @@ -102,21 +99,9 @@ def read_dataset_as_ddf( ) ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) - if len(ds_factory.tables) > 1: - warnings.warn( - "Trying to read a dataset with multiple internal tables. This functionality will be removed in the next " - "major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube " - "functionality. " - "https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html", - DeprecationWarning, - ) - if isinstance(columns, dict): columns = columns[table] meta = _get_dask_meta_for_dataset( @@ -130,10 +115,8 @@ def read_dataset_as_ddf( delayed_partitions = read_dataset_as_delayed( factory=ds_factory, columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, predicates=predicates, dispatch_by=dask_index_on if dask_index_on else dispatch_by, @@ -291,9 +274,7 @@ def store_dataset_from_ddf( if table is None: raise TypeError("The parameter `table` is not optional.") - ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, store=store, factory=None, load_dataset_metadata=True - ) + ds_factory = _ensure_factory(dataset_uuid=dataset_uuid, store=store, factory=None) if not overwrite: raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store) @@ -514,10 +495,7 @@ def collect_dataset_metadata( "Please make sure to provide a value larger than 0.0 and smaller than or equal to 1.0 ." ) dataset_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) mps = list( @@ -593,10 +571,7 @@ def hash_dataset( If provided, calculate hash per group instead of per partition """ dataset_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) columns = subset diff --git a/kartothek/io/dask/delayed.py b/kartothek/io/dask/delayed.py index b0b7852e..b0aad8fe 100644 --- a/kartothek/io/dask/delayed.py +++ b/kartothek/io/dask/delayed.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -import warnings from functools import partial from typing import List, Optional, Sequence @@ -77,11 +75,7 @@ def delete_dataset__delayed(dataset_uuid=None, store=None, factory=None): ---------- """ dataset_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_schema=False, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, load_schema=False, ) gc = garbage_collect_dataset__delayed(factory=dataset_factory) @@ -124,10 +118,7 @@ def garbage_collect_dataset__delayed( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) nested_files = dispatch_files_to_gc( @@ -150,23 +141,18 @@ def _load_and_concat_metapartitions(list_of_mps, *args, **kwargs): ) -# FIXME: remove @default_docs @normalize_args def read_dataset_as_delayed_metapartitions( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals: Optional[Sequence[str]] = None, - label_filter=None, dates_as_object=False, - load_dataset_metadata=False, predicates=None, factory=None, dispatch_by=None, - dispatch_metadata=True, ): """ A collection of dask.delayed objects to retrieve a dataset from store where each @@ -181,32 +167,15 @@ def read_dataset_as_delayed_metapartitions( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=load_dataset_metadata, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) - if len(ds_factory.tables) > 1: - warnings.warn( - "Trying to read a dataset with multiple internal tables. This functionality will be removed in the next " - "major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube " - "functionality. " - "https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html", - DeprecationWarning, - ) - store = ds_factory.store_factory mps = dispatch_metapartitions_from_factory( - dataset_factory=ds_factory, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, - label_filter=label_filter, - predicates=predicates, - dispatch_by=dispatch_by, - dispatch_metadata=dispatch_metadata, + dataset_factory=ds_factory, predicates=predicates, dispatch_by=dispatch_by, ) - if concat_partitions_on_primary_index or dispatch_by is not None: + if dispatch_by is not None: mps = _load_and_concat_metapartitions( mps, store=store, @@ -253,10 +222,8 @@ def read_dataset_as_delayed( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, predicates=None, factory=None, @@ -274,12 +241,9 @@ def read_dataset_as_delayed( store=store, factory=factory, columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, - load_dataset_metadata=False, predicates=predicates, dispatch_by=dispatch_by, ) diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index 4306c477..06e7b87c 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -1,4 +1,3 @@ -import warnings from functools import partial from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast @@ -45,7 +44,6 @@ ) from kartothek.io_components.write import raise_if_dataset_exists from kartothek.serialization import DataFrameSerializer -from kartothek.serialization._parquet import ParquetSerializer __all__ = ( "delete_dataset", @@ -72,11 +70,7 @@ def delete_dataset(dataset_uuid=None, store=None, factory=None): """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - load_schema=False, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, load_schema=False, store=store, factory=factory, ) # Remove possibly unreferenced files @@ -101,10 +95,8 @@ def read_dataset_as_dataframes( dataset_uuid: Optional[str] = None, store=None, columns: Dict[str, List[str]] = None, - concat_partitions_on_primary_index: bool = False, predicate_pushdown_to_io: bool = True, categoricals: List[str] = None, - label_filter: Callable = None, dates_as_object: bool = False, predicates: Optional[List[List[Tuple[str, str, Any]]]] = None, factory: Optional[DatasetFactory] = None, @@ -139,23 +131,17 @@ def read_dataset_as_dataframes( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=True, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) mps = read_dataset_as_metapartitions( columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, predicates=predicates, factory=ds_factory, dispatch_by=dispatch_by, - dispatch_metadata=False, ) return [mp.data for mp in mps] @@ -165,15 +151,12 @@ def read_dataset_as_metapartitions( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None, - dispatch_metadata=True, ): """ Read a dataset as a list of :class:`kartothek.io_components.metapartition.MetaPartition`. @@ -204,25 +187,19 @@ def read_dataset_as_metapartitions( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) from .iter import read_dataset_as_metapartitions__iterator ds_iter = read_dataset_as_metapartitions__iterator( columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, predicates=predicates, factory=ds_factory, dispatch_by=dispatch_by, - dispatch_metadata=dispatch_metadata, ) return list(ds_iter) @@ -232,10 +209,8 @@ def read_table( dataset_uuid: Optional[str] = None, store=None, columns: Dict[str, List[str]] = None, - concat_partitions_on_primary_index: bool = False, predicate_pushdown_to_io: bool = True, categoricals: List[str] = None, - label_filter: Callable = None, dates_as_object: bool = False, predicates: Optional[List[List[Tuple[str, str, Any]]]] = None, factory: Optional[DatasetFactory] = None, @@ -268,24 +243,14 @@ def read_table( >>> df = read_table(store, 'dataset_uuid', 'core') """ - if concat_partitions_on_primary_index is not False: - warnings.warn( - "The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major release.", - DeprecationWarning, - ) ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) partitions = read_dataset_as_dataframes( columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, predicates=predicates, factory=ds_factory, @@ -313,10 +278,8 @@ def commit_dataset( store: Optional[StoreInput] = None, dataset_uuid: Optional[str] = None, new_partitions: Optional[Iterable[MetaPartition]] = None, - output_dataset_uuid: Optional[str] = None, delete_scope: Optional[Iterable[Dict[str, Any]]] = None, metadata: Dict = None, - df_serializer: DataFrameSerializer = None, metadata_merger: Callable[[List[Dict]], Dict] = None, default_metadata_version: int = DEFAULT_METADATA_VERSION, partition_on: Optional[Iterable[str]] = None, @@ -394,17 +357,6 @@ def commit_dataset( Input partition to be committed. """ - if output_dataset_uuid is not None: - warnings.warn( - "The keyword `output_dataset_uuid` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - - if df_serializer is not None: - warnings.warn( - "The keyword `df_serializer` is deprecated and will be removed in the next major release.", - DeprecationWarning, - ) if not new_partitions and not metadata and not delete_scope: raise ValueError( @@ -482,7 +434,7 @@ def store_dataframes_as_dataset( dfs: List[Union[pd.DataFrame, Dict[str, pd.DataFrame]]], metadata: Optional[Dict[str, Dict[str, Any]]] = None, partition_on: Optional[List[str]] = None, - df_serializer: Optional[ParquetSerializer] = None, + df_serializer: Optional[DataFrameSerializer] = None, overwrite: bool = False, secondary_indices=None, metadata_storage_format=DEFAULT_METADATA_STORAGE_FORMAT, @@ -501,11 +453,8 @@ def store_dataframes_as_dataset( """ if isinstance(dfs, pd.DataFrame): - dfs = [dfs] - warnings.warn( - "Passing a single dataframe instead of an iterable is deprecated and may " - "be removed in the next major release.", - DeprecationWarning, + raise TypeError( + f"Please pass a list of pandas.DataFrame as input. Instead got {type(dfs)}" ) return store_dataframes_as_dataset__iter( @@ -589,10 +538,7 @@ def write_single_partition( store: Optional[KeyValueStore] = None, dataset_uuid: Optional[str] = None, data=None, - metadata: Optional[Dict[str, Dict[str, Any]]] = None, - df_serializer: Optional[ParquetSerializer] = None, - overwrite: bool = False, - metadata_merger=None, + df_serializer: Optional[DataFrameSerializer] = None, metadata_version: int = DEFAULT_METADATA_VERSION, partition_on: Optional[List[str]] = None, factory=None, @@ -628,24 +574,6 @@ def write_single_partition( ------- An empty :class:`~kartothek.io_components.metapartition.MetaPartition` referencing the new files """ - if metadata is not None: - warnings.warn( - "The keyword `metadata` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - - if overwrite is not False: - warnings.warn( - "The keyword `overwrite` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - - if metadata_merger is not None: - warnings.warn( - "The keyword `metadata_merger` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - if data is None: raise TypeError("The parameter `data` is not optional") dataset_factory, ds_metadata_version, partition_on = validate_partition_keys( @@ -686,12 +614,10 @@ def update_dataset_from_dataframes( dataset_uuid: Optional[str] = None, delete_scope=None, metadata=None, - df_serializer: Optional[ParquetSerializer] = None, + df_serializer: Optional[DataFrameSerializer] = None, metadata_merger: Callable = None, - central_partition_metadata: bool = True, default_metadata_version: int = DEFAULT_METADATA_VERSION, partition_on: Optional[List[str]] = None, - load_dynamic_metadata: bool = True, sort_partitions_by: Optional[str] = None, secondary_indices: Optional[List[str]] = None, table_name: str = SINGLE_TABLE, @@ -715,18 +641,6 @@ def update_dataset_from_dataframes( -------- :ref:`mutating_datasets` """ - if load_dynamic_metadata is not True: - warnings.warn( - "The keyword `load_dynamic_metadata` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - - if central_partition_metadata is not True: - warnings.warn( - "The keyword `central_partition_metadata` has no use and will be removed in the next major release ", - DeprecationWarning, - ) - ds_factory, metadata_version, partition_on = validate_partition_keys( dataset_uuid=dataset_uuid, store=store, @@ -779,10 +693,7 @@ def build_dataset_indices(store, dataset_uuid, columns, factory=None): ---------- """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) cols_to_load = set(columns) & set(ds_factory.schema.names) @@ -814,10 +725,7 @@ def garbage_collect_dataset(dataset_uuid=None, store=None, factory=None): """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) nested_files = dispatch_files_to_gc( diff --git a/kartothek/io/iter.py b/kartothek/io/iter.py index 302e352c..72592463 100644 --- a/kartothek/io/iter.py +++ b/kartothek/io/iter.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- - -import warnings from functools import partial from typing import cast @@ -43,16 +40,12 @@ def read_dataset_as_metapartitions__iterator( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, - load_dataset_metadata=False, predicates=None, factory=None, dispatch_by=None, - dispatch_metadata=True, ): """ @@ -69,24 +62,16 @@ def read_dataset_as_metapartitions__iterator( """ ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=factory, - load_dataset_metadata=load_dataset_metadata, + dataset_uuid=dataset_uuid, store=store, factory=factory, ) store = ds_factory.store mps = dispatch_metapartitions_from_factory( - ds_factory, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, - label_filter=label_filter, - predicates=predicates, - dispatch_by=dispatch_by, - dispatch_metadata=dispatch_metadata, + ds_factory, predicates=predicates, dispatch_by=dispatch_by, ) for mp in mps: - if concat_partitions_on_primary_index or dispatch_by is not None: + if dispatch_by is not None: mp = MetaPartition.concat_metapartitions( [ mp_inner.load_dataframes( @@ -118,10 +103,8 @@ def read_dataset_as_dataframes__iterator( dataset_uuid=None, store=None, columns=None, - concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, - label_filter=None, dates_as_object=False, predicates=None, factory=None, @@ -169,16 +152,12 @@ def read_dataset_as_dataframes__iterator( dataset_uuid=dataset_uuid, store=store, columns=columns, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, - load_dataset_metadata=False, predicates=predicates, factory=factory, dispatch_by=dispatch_by, - dispatch_metadata=False, ) for mp in mp_iter: yield mp.data @@ -194,10 +173,8 @@ def update_dataset_from_dataframes__iter( metadata=None, df_serializer=None, metadata_merger=None, - central_partition_metadata=True, default_metadata_version=DEFAULT_METADATA_VERSION, partition_on=None, - load_dynamic_metadata=True, sort_partitions_by=None, secondary_indices=None, factory=None, @@ -219,17 +196,7 @@ def update_dataset_from_dataframes__iter( -------- :ref:`mutating_datasets` """ - if load_dynamic_metadata is not True: - warnings.warn( - "The keyword `load_dynamic_metadata` has no use and will be removed soon", - DeprecationWarning, - ) - if central_partition_metadata is not True: - warnings.warn( - "The keyword `central_partition_metadata` has no use and will be removed in the next major release ", - DeprecationWarning, - ) ds_factory, metadata_version, partition_on = validate_partition_keys( dataset_uuid=dataset_uuid, store=store, diff --git a/kartothek/io/testing/read.py b/kartothek/io/testing/read.py index afb451ee..dcb048bf 100644 --- a/kartothek/io/testing/read.py +++ b/kartothek/io/testing/read.py @@ -24,7 +24,6 @@ The following fixtures should be present (see tests.read.conftest) * ``use_categoricals`` - Whether or not the call retrievs categorical data. * ``dates_as_object`` - Whether or not the call retrievs date columns as objects. -* ``label_filter`` - a callable to filter partitions by label. """ @@ -52,31 +51,11 @@ def dates_as_object(request): return request.param -@pytest.fixture( - params=[True, False], - ids=["load_dataset_metadata_TRUE", "load_dataset_metadata_FALSE"], -) -def load_dataset_metadata(request): - return request.param - - -@pytest.fixture(params=[None, lambda part_label: "cluster_1" in part_label]) -def label_filter(request): - return request.param - - @pytest.fixture(params=[True, False], ids=["use_factory", "no_factory"]) def use_dataset_factory(request, dates_as_object): return request.param -def _strip_unused_categoricals(df): - for col in df.columns: - if pd.api.types.is_categorical_dtype(df[col]): - df[col] = df[col].cat.remove_unused_categories() - return df - - class NoPickle: def __getstate__(self): raise RuntimeError("do NOT pickle this object!") @@ -159,7 +138,6 @@ def _perform_read_test( execute_read_callable, use_categoricals, output_type, - label_filter, dates_as_object, read_kwargs=None, ds_factory=None, @@ -177,17 +155,11 @@ def _perform_read_test( store=store_factory, factory=ds_factory, categoricals=categoricals, - label_filter=label_filter, dates_as_object=dates_as_object, **read_kwargs, ) - # The filter should allow only a single partition - if not label_filter: - assert len(result) == 2 - else: - # The filter should allow only a single partition - assert len(result) == 1 + assert len(result) == 2 if output_type == "metapartition": for res in result: @@ -490,7 +462,6 @@ def test_read_dataset_as_dataframes( bound_load_dataframes, use_categoricals, output_type, - label_filter, dates_as_object, ): if use_dataset_factory: @@ -509,7 +480,6 @@ def test_read_dataset_as_dataframes( execute_read_callable=bound_load_dataframes, use_categoricals=use_categoricals, output_type=output_type, - label_filter=label_filter, dates_as_object=dates_as_object, ) diff --git a/kartothek/io_components/cube/query/_regroup.py b/kartothek/io_components/cube/query/_regroup.py index ebde6887..7a67a5d2 100644 --- a/kartothek/io_components/cube/query/_regroup.py +++ b/kartothek/io_components/cube/query/_regroup.py @@ -306,8 +306,8 @@ def _map_ktk_mps_to_groups(cube, datasets, label2gp): label2gp_sub = label2gp[ktk_cube_dataset_id] for mp in dispatch_metapartitions_from_factory( dataset_factory=metadata_factory_from_dataset(ds), - concat_partitions_on_primary_index=False, ): + # FIXME: can this be simplified? if mp.label not in label2gp_sub: # filtered out by pre-condition continue diff --git a/kartothek/io_components/gc.py b/kartothek/io_components/gc.py index 951d0a14..c0d693dc 100644 --- a/kartothek/io_components/gc.py +++ b/kartothek/io_components/gc.py @@ -7,10 +7,7 @@ def dispatch_files_to_gc(dataset_uuid, store_factory, chunk_size, factory): ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store_factory, - factory=factory, - load_dataset_metadata=False, + dataset_uuid=dataset_uuid, store=store_factory, factory=factory, ) dataset_uuid = dataset_uuid or ds_factory.uuid diff --git a/kartothek/io_components/read.py b/kartothek/io_components/read.py index c865434c..b35090df 100644 --- a/kartothek/io_components/read.py +++ b/kartothek/io_components/read.py @@ -1,10 +1,8 @@ -import warnings -from typing import Callable, Iterator, List, Optional, Set, Union, cast, overload +from typing import Iterator, List, Optional, Set, Union, cast, overload import pandas as pd from kartothek.core.factory import DatasetFactory -from kartothek.core.index import ExplicitSecondaryIndex from kartothek.core.typing import StoreInput from kartothek.io_components.metapartition import MetaPartition from kartothek.io_components.utils import normalize_args @@ -18,23 +16,15 @@ @overload def dispatch_metapartitions_from_factory( dataset_factory: DatasetFactory, - label_filter: Optional[Callable] = None, - concat_partitions_on_primary_index: bool = False, predicates: PredicatesType = None, dispatch_by: None = None, - dispatch_metadata: bool = False, ) -> Iterator[MetaPartition]: ... @overload def dispatch_metapartitions_from_factory( - dataset_factory: DatasetFactory, - label_filter: Optional[Callable], - concat_partitions_on_primary_index: bool, - predicates: PredicatesType, - dispatch_by: List[str], - dispatch_metadata: bool, + dataset_factory: DatasetFactory, predicates: PredicatesType, dispatch_by: List[str], ) -> Iterator[List[MetaPartition]]: ... @@ -42,37 +32,13 @@ def dispatch_metapartitions_from_factory( @normalize_args def dispatch_metapartitions_from_factory( dataset_factory: DatasetFactory, - label_filter: Optional[Callable] = None, - concat_partitions_on_primary_index: bool = False, predicates: PredicatesType = None, dispatch_by: Optional[List[str]] = None, - dispatch_metadata: bool = False, ) -> Union[Iterator[MetaPartition], Iterator[List[MetaPartition]]]: """ :meta private: """ - if dispatch_metadata: - - warnings.warn( - "The dispatch of metadata and index information as part of the MetaPartition instance is deprecated. " - "The future behaviour will be that this metadata is not dispatched. To set the future behaviour, " - "specifiy ``dispatch_metadata=False``", - DeprecationWarning, - ) - - if dispatch_by is not None and concat_partitions_on_primary_index: - raise ValueError( - "Both `dispatch_by` and `concat_partitions_on_primary_index` are provided, " - "`concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. " - "Please only provide the `dispatch_by` argument. " - ) - if concat_partitions_on_primary_index: - warnings.warn( - "The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. Use `dispatch_by=dataset_factory.partition_keys` to achieve the same behavior instead.", - DeprecationWarning, - ) - dispatch_by = dataset_factory.partition_keys if dispatch_by is not None and not set(dispatch_by).issubset( set(dataset_factory.index_columns) @@ -99,15 +65,6 @@ def dispatch_metapartitions_from_factory( list(index_cols), predicates=predicates ) - if label_filter: - base_df = base_df[base_df.index.map(label_filter)] - - indices_to_dispatch = { - name: ix.unload() - for name, ix in dataset_factory.indices.items() - if isinstance(ix, ExplicitSecondaryIndex) - } - if dispatch_by is not None: base_df = cast(pd.DataFrame, base_df) @@ -130,7 +87,6 @@ def dispatch_metapartitions_from_factory( mps.append( MetaPartition.from_partition( partition=dataset_factory.partitions[label], - indices=indices_to_dispatch if dispatch_metadata else None, metadata_version=dataset_factory.metadata_version, schema=dataset_factory.schema, partition_keys=dataset_factory.partition_keys, @@ -144,7 +100,6 @@ def dispatch_metapartitions_from_factory( yield MetaPartition.from_partition( partition=part, - indices=indices_to_dispatch if dispatch_metadata else None, metadata_version=dataset_factory.metadata_version, schema=dataset_factory.schema, partition_keys=dataset_factory.partition_keys, @@ -154,25 +109,16 @@ def dispatch_metapartitions_from_factory( def dispatch_metapartitions( dataset_uuid: str, store: StoreInput, - label_filter: Optional[Callable] = None, - concat_partitions_on_primary_index: bool = False, predicates: PredicatesType = None, dispatch_by: Optional[List[str]] = None, - dispatch_metadata: bool = False, ) -> Union[Iterator[MetaPartition], Iterator[List[MetaPartition]]]: dataset_factory = DatasetFactory( dataset_uuid=dataset_uuid, store_factory=store, load_schema=True, load_all_indices=False, - load_dataset_metadata=False, ) return dispatch_metapartitions_from_factory( - dataset_factory=dataset_factory, - label_filter=label_filter, - predicates=predicates, - dispatch_by=dispatch_by, - concat_partitions_on_primary_index=concat_partitions_on_primary_index, - dispatch_metadata=dispatch_metadata, + dataset_factory=dataset_factory, predicates=predicates, dispatch_by=dispatch_by, ) diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index 12b8a0ba..f4273f0e 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -138,19 +138,11 @@ def _ensure_compatible_indices( def validate_partition_keys( - dataset_uuid, - store, - ds_factory, - default_metadata_version, - partition_on, - load_dataset_metadata=True, + dataset_uuid, store, ds_factory, default_metadata_version, partition_on, ): if ds_factory or DatasetMetadata.exists(dataset_uuid, ensure_store(store)): ds_factory = _ensure_factory( - dataset_uuid=dataset_uuid, - store=store, - factory=ds_factory, - load_dataset_metadata=load_dataset_metadata, + dataset_uuid=dataset_uuid, store=store, factory=ds_factory, ) ds_metadata_version = ds_factory.metadata_version diff --git a/kartothek/utils/ktk_adapters.py b/kartothek/utils/ktk_adapters.py index 1a5d94d8..38246119 100644 --- a/kartothek/utils/ktk_adapters.py +++ b/kartothek/utils/ktk_adapters.py @@ -130,9 +130,6 @@ def get_physical_partition_stats(metapartitions, store): """ Get statistics for partition. - .. hint:: - To get the metapartitions pre-aligned, use ``concat_partitions_on_primary_index=True`` during dispatch. - Parameters ---------- metapartitions: Iterable[kartothek.io_components.metapartition.MetaPartition] diff --git a/tests/conftest.py b/tests/conftest.py index 1dffe261..be0d9c19 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -449,7 +449,6 @@ def dataset_factory(dataset, store_session_factory): store_factory=store_session_factory, load_schema=True, load_all_indices=False, - load_dataset_metadata=True, ) @@ -480,7 +479,6 @@ def dataset_partition_keys_factory(dataset_partition_keys, store_session_factory store_factory=store_session_factory, load_schema=True, load_all_indices=False, - load_dataset_metadata=True, ) @@ -515,7 +513,6 @@ def dataset_with_index_factory(dataset_with_index, store_session_factory): store_factory=store_session_factory, load_schema=True, load_all_indices=False, - load_dataset_metadata=True, ) diff --git a/tests/io/dask/dataframe/test_read.py b/tests/io/dask/dataframe/test_read.py index e8efa05c..176e4b7c 100644 --- a/tests/io/dask/dataframe/test_read.py +++ b/tests/io/dask/dataframe/test_read.py @@ -84,7 +84,6 @@ def test_load_dataframe_categoricals_with_index(dataset_with_index_factory): bound_load_dataframes=func, use_categoricals=True, output_type="table", - label_filter=None, dates_as_object=False, ) diff --git a/tests/io_components/test_read.py b/tests/io_components/test_read.py index 9ea1c548..a33a1a61 100644 --- a/tests/io_components/test_read.py +++ b/tests/io_components/test_read.py @@ -1,6 +1,5 @@ import math import types -from collections import OrderedDict import numpy as np import pandas as pd @@ -24,22 +23,6 @@ def test_dispatch_metapartitions(dataset, store_session): assert mp.table_name == SINGLE_TABLE -def test_dispatch_metapartitions_label_filter(dataset, store_session): - def label_filter(part_label): - return "cluster_1" in part_label - - part_generator = dispatch_metapartitions( - dataset.uuid, store_session, label_filter=label_filter - ) - - assert isinstance(part_generator, types.GeneratorType) - partitions = OrderedDict([(part.label, part) for part in part_generator]) - - assert len(partitions) == 1 - mp = partitions["cluster_1"] - assert isinstance(mp, MetaPartition) - - @pytest.mark.parametrize( "predicates,error_msg", [([], "Empty predicates"), ([[]], "Invalid predicates: Conjunction 0 is empty")], @@ -103,21 +86,9 @@ def test_dispatch_metapartitions_concat_regression(store): partition_on=["p"], ) - mps = list( - dispatch_metapartitions( - dataset.uuid, store, concat_partitions_on_primary_index=False - ) - ) + mps = list(dispatch_metapartitions(dataset.uuid, store)) assert len(mps) == 2 - with pytest.deprecated_call(): - mps = list( - dispatch_metapartitions( - dataset.uuid, store, concat_partitions_on_primary_index=True - ) - ) - assert len(mps) == 1 - mps = list(dispatch_metapartitions(dataset.uuid, store, dispatch_by=["p"])) assert len(mps) == 1