Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds dataframe prefix to columns
Browse files Browse the repository at this point in the history
That way we know how to map an output back to the function.
We decided this was a better approach for now. We don't want to
lose the linkage between output and function, but for dataframe
flattening, this is the best approach.

It does mean that the columns cease being valid python identifiers,
but that's fine because we're in build_result. If people complain
we can figure out a way to change it.

Upside is that we have less code to check/manage with this approach.
  • Loading branch information
skrawcz committed Feb 20, 2023
1 parent f136d3e commit b02e1a4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 54 deletions.
34 changes: 13 additions & 21 deletions hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,33 +189,25 @@ def build_dataframe_with_dataframes(outputs: Dict[str, Any]) -> pd.DataFrame:
:param outputs: The outputs to build the dataframe from.
:return: A dataframe with the outputs.
"""

def get_output_name(output_name: str, column_name: str) -> str:
"""Add function prefix to columns.
Note this means that they stop being valid python identifiers due to the `.` in the string.
"""
return f"{output_name}.{column_name}"

flattened_outputs = {}
for name, output in outputs.items():
if isinstance(output, pd.DataFrame):
df_columns = list(output.columns)
column_intersection = [
column for column in df_columns if column in flattened_outputs
]
if column_intersection:
raise ValueError(
f"Dataframe {name} contains columns {column_intersection} that already exist in the output. "
f"Please rename the columns in {name} to avoid this error."
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Unpacking dataframe {name} into dict of series with columns {df_columns}."
)
name_prefix = f'{name.split("_")[0]}_'
if any([column for column in df_columns if not column.startswith(name_prefix)]):
# adding this here to try to force some consistency in the naming of columns and mapping it to the
# originating function.
logger.warning(
f"Friendly warning: you're unpacking the dataframe from function [{name}()] into a dict of "
f"series with columns {df_columns}. This will likely make it hard to map columns to code. "
f"To get rid of this warning, use the prefix of the function name to prefix the column names. "
f"e.g. spend_df() would require columns to start with spend_."
f"Unpacking dataframe {name} into dict of series with columns {list(output.columns)}."
)
df_dict = output.to_dict(orient="series")

df_dict = {
get_output_name(name, col_name): col_value
for col_name, col_value in output.to_dict(orient="series").items()
}
flattened_outputs.update(df_dict)
elif isinstance(output, pd.Series):
if name in flattened_outputs:
Expand Down
54 changes: 21 additions & 33 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,19 @@ def __eq__(self, other: typing.Any) -> bool:
"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}),
"b": pd.DataFrame({"c": [1, 3, 5], "d": [14, 15, 16]}),
},
pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13], "c": [1, 3, 5], "d": [14, 15, 16]}),
pd.DataFrame(
{"a.a": [1, 2, 3], "a.b": [11, 12, 13], "b.c": [1, 3, 5], "b.d": [14, 15, 16]}
),
),
(
{
"a": pd.Series([1, 2, 3]),
"b": pd.Series([11, 12, 13]),
"c": pd.DataFrame({"d": [0, 0, 0]}),
},
pd.DataFrame({"a": pd.Series([1, 2, 3]), "b": pd.Series([11, 12, 13]), "d": [0, 0, 0]}),
pd.DataFrame(
{"a": pd.Series([1, 2, 3]), "b": pd.Series([11, 12, 13]), "c.d": [0, 0, 0]}
),
),
],
ids=[
Expand Down Expand Up @@ -284,7 +288,13 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
"d": [8, 9, 10],
},
pd.DataFrame(
{"a": [1, 2, 3], "z": [0, 0, 0], "b": [4, 5, 6], "c": [7, 7, 7], "d": [8, 9, 10]}
{
"a.a": [1, 2, 3],
"a.z": [0, 0, 0],
"b": [4, 5, 6],
"c": [7, 7, 7],
"d": [8, 9, 10],
}
),
),
(
Expand All @@ -294,10 +304,10 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
},
pd.DataFrame(
{
"a": [1, 2, 3, None, None, None],
"b": [11, 12, 13, None, None, None],
"c": [None, None, None, 1, 3, 5],
"d": [None, None, None, 14, 15, 16],
"a.a": [1, 2, 3, None, None, None],
"a.b": [11, 12, 13, None, None, None],
"b.c": [None, None, None, 1, 3, 5],
"b.d": [None, None, None, 14, 15, 16],
},
index=[0, 1, 2, 3, 4, 5],
),
Expand All @@ -312,11 +322,11 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
pd.DataFrame(
{
"a": [None, 1, 2, 3],
"d": [0, 0, 0, None],
"e": [1, 1, 1, None],
"c.d": [0, 0, 0, None],
"c.e": [1, 1, 1, None],
"b": [11, 12, 13, None],
"g": [None, 2, 2, 2],
"h": [None, 3, 3, 3],
"f.g": [None, 2, 2, 2],
"f.h": [None, 3, 3, 3],
},
index=[0, 1, 2, 3],
),
Expand All @@ -335,28 +345,6 @@ def test_PandasDataFrameResult_build_dataframe_with_dataframes(outputs, expected
pd.testing.assert_frame_equal(actual, expected_result)


@pytest.mark.parametrize(
"outputs",
[
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": pd.Series([4, 5, 6])},
{"b": pd.Series([4, 5, 6]), "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": 7},
{"b": 7, "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
],
ids=[
"test-df-series-duplicate",
"test-series-df-duplicate",
"test-df-scalar-duplicate",
"test-scalar-df-duplicate",
],
)
def test_PandasDataFrameResult_build_dataframe_with_dataframes_error(outputs):
"""Tests build_dataframe_with_dataframes works as expected"""
pdfr = base.PandasDataFrameResult()
with pytest.raises(ValueError):
pdfr.build_dataframe_with_dataframes(outputs)


@pytest.mark.parametrize(
"outputs,expected_result",
[
Expand Down

0 comments on commit b02e1a4

Please sign in to comment.