Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Adds code to make concatenating dataframes columnwise in the result builder default #321

Merged
merged 3 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,61 @@ def build_result(**outputs: Dict[str, Any]) -> pd.DataFrame:
# If we're dealing with all values that don't have any "index" that could be created
# (i.e. scalars, objects) coerce the output to a single-row, multi-column dataframe.
return pd.DataFrame([outputs])
#
contains_df = any(isinstance(value, pd.DataFrame) for value in outputs.values())
if contains_df:
# build the dataframe from the outputs
return PandasDataFrameResult.build_dataframe_with_dataframes(outputs)
# don't do anything special if dataframes aren't in the output.
return pd.DataFrame(outputs) # this does an implicit outer join based on index.
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved

return pd.DataFrame(outputs)
@staticmethod
def build_dataframe_with_dataframes(outputs: Dict[str, Any]) -> pd.DataFrame:
"""Builds a dataframe from the outputs in an "outer join" manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in.
To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are
redefined in a rolling fashion going in order of the outputs requested. This then results in an "enlarged"
outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

:param outputs: The outputs to build the dataframe from.
:return: A dataframe with the outputs.
"""
flattened_outputs = {}
for name, output in outputs.items():
if isinstance(output, pd.DataFrame):
df_columns = list(output.columns)
column_intersection = [
column for column in df_columns if column in flattened_outputs
]
if column_intersection:
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Dataframe {name} contains columns {column_intersection} that already exist in the output. "
f"Please rename the columns in {name} to avoid this error."
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Unpacking dataframe {name} into dict of series with columns {df_columns}."
)
df_dict = output.to_dict(orient="series")
flattened_outputs.update(df_dict)
elif isinstance(output, pd.Series):
if name in flattened_outputs:
raise ValueError(
f"Series {name} already exists in the output. "
f"Please rename the series to avoid this error, or determine from where the initial series is "
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output
else:
if name in flattened_outputs:
raise ValueError(
f"Non series output {name} already exists in the output. "
f"Please rename this output to avoid this error, or determine from where the initial value is "
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output
return pd.DataFrame(flattened_outputs)


class StrictIndexTypePandasDataFrameResult(PandasDataFrameResult):
Expand Down
116 changes: 101 additions & 15 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ def __eq__(self, other: typing.Any) -> bool:
{"a": pd.Series([1, 2, 3]), "b": pd.Series([4, 5, 6])},
pd.DataFrame({"a": pd.Series([1, 2, 3]), "b": pd.Series([4, 5, 6])}),
),
(
{
"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}),
"b": pd.DataFrame({"c": [1, 3, 5], "d": [14, 15, 16]}),
},
pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13], "c": [1, 3, 5], "d": [14, 15, 16]}),
),
(
{
"a": pd.Series([1, 2, 3]),
"b": pd.Series([11, 12, 13]),
"c": pd.DataFrame({"d": [0, 0, 0]}),
},
pd.DataFrame({"a": pd.Series([1, 2, 3]), "b": pd.Series([11, 12, 13]), "d": [0, 0, 0]}),
),
],
ids=[
"test-single-scalar",
Expand All @@ -225,6 +240,8 @@ def __eq__(self, other: typing.Any) -> bool:
"test-scalar-and-list",
"test-scalar-and-dict",
"test-series-and-list",
"test-multiple-dataframes",
"test-multiple-series-with-dataframe",
],
)
def test_PandasDataFrameResult_build_result(outputs, expected_result):
Expand All @@ -237,27 +254,12 @@ def test_PandasDataFrameResult_build_result(outputs, expected_result):
@pytest.mark.parametrize(
"outputs",
[
(
{
"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}),
"b": pd.DataFrame({"c": [1, 3, 5], "d": [14, 15, 16]}),
}
),
(
{
"a": pd.Series([1, 2, 3]),
"b": pd.Series([11, 12, 13]),
"c": pd.DataFrame({"d": [0, 0, 0]}),
}
),
({"a": [1, 2], "b": {"foo": "bar"}}),
({"a": [1, 2], "b": [3, 4, 5]}),
({"a": np.array([1, 2]), "b": np.array([3, 4, 5])}),
({"a": _gen_ints(3), "b": _gen_ints(4)}),
],
ids=[
"test-multiple-dataframes",
"test-multiple-series-with-dataframe",
"test-lists-and-dicts",
"test-mismatched-lists",
"test-mismatched-arrays",
Expand All @@ -271,6 +273,90 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
pdfr.build_result(**outputs)


@pytest.mark.parametrize(
"outputs,expected_result",
[
(
{
"a": pd.DataFrame({"a": [1, 2, 3], "z": [0, 0, 0]}),
"b": pd.Series([4, 5, 6]),
"c": 7,
"d": [8, 9, 10],
},
pd.DataFrame(
{"a": [1, 2, 3], "z": [0, 0, 0], "b": [4, 5, 6], "c": [7, 7, 7], "d": [8, 9, 10]}
),
),
(
{
"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}, index=[0, 1, 2]),
"b": pd.DataFrame({"c": [1, 3, 5], "d": [14, 15, 16]}, index=[3, 4, 5]),
},
pd.DataFrame(
{
"a": [1, 2, 3, None, None, None],
"b": [11, 12, 13, None, None, None],
"c": [None, None, None, 1, 3, 5],
"d": [None, None, None, 14, 15, 16],
},
index=[0, 1, 2, 3, 4, 5],
),
),
(
{
"a": pd.Series([1, 2, 3], index=[1, 2, 3]),
"c": pd.DataFrame({"d": [0, 0, 0], "e": [1, 1, 1]}),
"b": pd.Series([11, 12, 13]),
"f": pd.DataFrame({"g": [2, 2, 2], "h": [3, 3, 3]}, index=[1, 2, 3]),
},
pd.DataFrame(
{
"a": [None, 1, 2, 3],
"d": [0, 0, 0, None],
"e": [1, 1, 1, None],
"b": [11, 12, 13, None],
"g": [None, 2, 2, 2],
"h": [None, 3, 3, 3],
},
index=[0, 1, 2, 3],
),
),
],
ids=[
"test-dataframe-scalar-series-list",
"test-two-dataframes",
"test-order-and-outer-join-preserved",
],
)
def test_PandasDataFrameResult_build_dataframe_with_dataframes(outputs, expected_result):
"""Tests build_dataframe_with_dataframes errors as expected"""
pdfr = base.PandasDataFrameResult()
actual = pdfr.build_dataframe_with_dataframes(outputs)
pd.testing.assert_frame_equal(actual, expected_result)


@pytest.mark.parametrize(
"outputs",
[
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": pd.Series([4, 5, 6])},
{"b": pd.Series([4, 5, 6]), "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": 7},
{"b": 7, "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
],
ids=[
"test-df-series-duplicate",
"test-series-df-duplicate",
"test-df-scalar-duplicate",
"test-scalar-df-duplicate",
],
)
def test_PandasDataFrameResult_build_dataframe_with_dataframes_error(outputs):
"""Tests build_dataframe_with_dataframes works as expected"""
pdfr = base.PandasDataFrameResult()
with pytest.raises(ValueError):
pdfr.build_dataframe_with_dataframes(outputs)


@pytest.mark.parametrize(
"outputs,expected_result",
[
Expand Down