Skip to content

Commit

Permalink
Add tests for non-default table (JDASoftwareGroup#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Mar 17, 2021
1 parent f7615ec commit 3807a02
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 1 deletion.
16 changes: 16 additions & 0 deletions kartothek/io/testing/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,3 +642,19 @@ def test_extensiondtype_rountrip(store_factory, bound_load_dataframes):
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
pdt.assert_frame_equal(df, result_df)


def test_non_default_table_name_roundtrip(store_factory, bound_load_dataframes):
df = pd.DataFrame({"A": [1]})
store_dataframes_as_dataset(
dfs=[df], store=store_factory, dataset_uuid="dataset_uuid", table_name="foo"
)
result = bound_load_dataframes(dataset_uuid="dataset_uuid", store=store_factory)

probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
pdt.assert_frame_equal(df, result_df)
14 changes: 14 additions & 0 deletions kartothek/io/testing/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from kartothek.core.dataset import DatasetMetadata
from kartothek.core.uuid import gen_uuid
from kartothek.io.eager import read_table
from kartothek.io_components.metapartition import MetaPartition
from kartothek.serialization import DataFrameSerializer

Expand Down Expand Up @@ -621,3 +622,16 @@ def test_secondary_index_on_partition_column(store_factory, bound_store_datafram
bound_store_dataframes(
[df1], store=store_factory, partition_on=["x"], secondary_indices=["x"]
)


def test_non_default_table_name_roundtrip(store_factory, bound_store_dataframes):
df = pd.DataFrame({"A": [1]})
bound_store_dataframes(
[df], store=store_factory, dataset_uuid="dataset_uuid", table_name="foo"
)
for k in store_factory():
if k.endswith(".parquet") and "indices" not in k:
assert "foo" in k
result = read_table(dataset_uuid="dataset_uuid", store=store_factory)

pdt.assert_frame_equal(df, result)
4 changes: 3 additions & 1 deletion kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def from_partition(
schema: Optional[SchemaWrapper] = None,
partition_keys: Optional[List[str]] = None,
logical_conjunction: Optional[List[Tuple[Any, str, Any]]] = None,
table_name: str = SINGLE_TABLE,
):
"""
Transform a kartothek :class:`~kartothek.core.partition.Partition` into a
Expand All @@ -422,13 +423,14 @@ def from_partition(
"""
return MetaPartition(
label=partition.label,
file=partition.files[SINGLE_TABLE],
file=partition.files[table_name],
data=data,
indices=indices,
metadata_version=metadata_version,
schema=schema,
partition_keys=partition_keys,
logical_conjunction=logical_conjunction,
table_name=table_name,
)

def add_metapartition(
Expand Down
2 changes: 2 additions & 0 deletions kartothek/io_components/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def dispatch_metapartitions_from_factory(
schema=dataset_factory.schema,
partition_keys=dataset_factory.partition_keys,
logical_conjunction=logical_conjunction,
table_name=dataset_factory.table_name,
)
)
yield mps
Expand All @@ -103,6 +104,7 @@ def dispatch_metapartitions_from_factory(
metadata_version=dataset_factory.metadata_version,
schema=dataset_factory.schema,
partition_keys=dataset_factory.partition_keys,
table_name=dataset_factory.table_name,
)


Expand Down

0 comments on commit 3807a02

Please sign in to comment.