Skip to content

Commit

Permalink
Fix bug when loading few columns of a dataset with many primary indic…
Browse files Browse the repository at this point in the history
…es (JDASoftwareGroup#446)

* Only increase pos if necessary.

* Changelog.

* Empty line in changelog.

* Add test.

* Don't link to private function.
  • Loading branch information
mlondschien authored Apr 14, 2021
1 parent b278503 commit 90ee486
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
Changelog
=========


Kartothek 4.0.2 (2021-04-xx)
============================

* Fix a bug in ``MetaPartition._reconstruct_index_columns`` that would raise an ``IndexError`` when loading few columns of a dataset with many primary indices.


Kartothek 4.0.1 (2021-04-13)
============================

Expand Down
4 changes: 3 additions & 1 deletion kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ def _reconstruct_index_columns(
# indexer call is slow, so only do that if really necessary
df = df.reindex(columns=cleaned_original_columns, copy=False)

for pos, (primary_key, value) in enumerate(key_indices):
pos = 0
for primary_key, value in key_indices:
# If there are predicates, don't reconstruct the index if it wasn't requested
if columns is not None and primary_key not in columns:
continue
Expand Down Expand Up @@ -801,6 +802,7 @@ def _reconstruct_index_columns(
if convert_to_date:
value = pd.Timestamp(value).to_pydatetime().date()
df.insert(pos, primary_key, value)
pos += 1

return df

Expand Down
18 changes: 16 additions & 2 deletions tests/io_components/test_metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_store_single_dataframe_as_partition(store, metadata_version):
mp = MetaPartition(label="test_label", data=df, metadata_version=metadata_version)

meta_partition = mp.store_dataframes(
store=store, df_serializer=ParquetSerializer(), dataset_uuid="dataset_uuid",
store=store, df_serializer=ParquetSerializer(), dataset_uuid="dataset_uuid"
)

assert meta_partition.data is None
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_load_dataframe_logical_conjunction(store, metadata_version):
logical_conjunction=[("P", ">", 4)],
)
meta_partition = mp.store_dataframes(
store=store, df_serializer=None, dataset_uuid="dataset_uuid",
store=store, df_serializer=None, dataset_uuid="dataset_uuid"
)
predicates = None
loaded_mp = meta_partition.load_dataframes(store=store, predicates=predicates)
Expand Down Expand Up @@ -1333,6 +1333,20 @@ def test_get_parquet_metadata_row_group_size(store):
pd.testing.assert_frame_equal(actual, expected)


def test__reconstruct_index_columns():
df = pd.DataFrame({"x": [0], "a": [-1], "b": [-2], "c": [-3]})
mp = MetaPartition(label="test_label", data=df)
df_with_index_columns = mp._reconstruct_index_columns(
df=df[["x"]],
key_indices=[("a", 1), ("b", 2), ("c", 3)],
columns=["x", "c"],
categories=None,
date_as_object=False,
)
# Index columns first
pdt.assert_frame_equal(df_with_index_columns, pd.DataFrame({"c": [3], "x": [0]}))


def test_partition_on_keeps_table_name():
mp = MetaPartition(
label="label_1",
Expand Down

0 comments on commit 90ee486

Please sign in to comment.