Skip to content

Commit

Permalink
[data/preprocessors] feat: allow discretizers to be used in append mo…
Browse files Browse the repository at this point in the history
…de (ray-project#50584)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

This is part of ray-project#48133.
Continuing the approach taken in
ray-project#49426, make all the discretizers
work in append mode

## Related issue number

ray-project#48133

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Martin Bomio <martinbomio@spotify.com>
  • Loading branch information
martinbomio authored and xsuler committed Mar 4, 2025
1 parent 818711a commit 16112f5
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 2 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def _derive_and_validate_output_columns(

if output_columns and len(columns) != len(output_columns):
raise ValueError(
"Invalid output_columns: Got len(columns) != len(output_columns)."
"Invalid output_columns: Got len(columns) != len(output_columns). "
"The length of columns and output_columns must match."
)
return output_columns or columns
Expand Down
54 changes: 53 additions & 1 deletion python/ray/data/preprocessors/discretizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def bin_values(s: pd.Series) -> pd.Series:
duplicates=self.duplicates,
)

return df.apply(bin_values, axis=0)
binned_df = df.apply(bin_values, axis=0)
df[self.output_columns] = binned_df[self.columns]
return df

def _validate_bins_columns(self):
if isinstance(self.bins, dict) and not all(
Expand Down Expand Up @@ -95,6 +97,23 @@ class CustomKBinsDiscretizer(_AbstractKBinsDiscretizer):
4 2 3
5 1 3
:class:`CustomKBinsDiscretizer` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> discretizer = CustomKBinsDiscretizer(
... columns=["value_1", "value_2"],
... bins=[0, 1, 4, 10, 25],
... output_columns=["value_1_discretized", "value_2_discretized"]
... )
>>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP
value_1 value_2 value_1_discretized value_2_discretized
0 0.2 10 0 2
1 1.4 15 1 3
2 2.5 13 1 3
3 6.2 12 2 3
4 9.7 23 2 3
5 2.1 25 1 3
You can also specify different bin edges per column.
>>> discretizer = CustomKBinsDiscretizer(
Expand Down Expand Up @@ -128,6 +147,10 @@ class CustomKBinsDiscretizer(_AbstractKBinsDiscretizer):
``pd.CategoricalDtype`` with the categories being mapped to bins.
You can use ``pd.CategoricalDtype(categories, ordered=True)`` to
preserve information about bin order.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
Expand All @@ -150,13 +173,17 @@ def __init__(
dtypes: Optional[
Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]
] = None,
output_columns: Optional[List[str]] = None,
):
self.columns = columns
self.bins = bins
self.right = right
self.include_lowest = include_lowest
self.duplicates = duplicates
self.dtypes = dtypes
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)

self._validate_bins_columns()

Expand Down Expand Up @@ -192,6 +219,23 @@ class UniformKBinsDiscretizer(_AbstractKBinsDiscretizer):
4 3 3
5 0 3
:class:`UniformKBinsDiscretizer` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> discretizer = UniformKBinsDiscretizer(
... columns=["value_1", "value_2"],
... bins=4,
... output_columns=["value_1_discretized", "value_2_discretized"]
... )
>>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP
value_1 value_2 value_1_discretized value_2_discretized
0 0.2 10 0 0
1 1.4 15 0 1
2 2.5 13 0 0
3 6.2 12 2 0
4 9.7 23 3 3
5 2.1 25 0 3
You can also specify different number of bins per column.
>>> discretizer = UniformKBinsDiscretizer(
Expand Down Expand Up @@ -227,6 +271,10 @@ class UniformKBinsDiscretizer(_AbstractKBinsDiscretizer):
``pd.CategoricalDtype`` with the categories being mapped to bins.
You can use ``pd.CategoricalDtype(categories, ordered=True)`` to
preserve information about bin order.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
Expand All @@ -245,13 +293,17 @@ def __init__(
dtypes: Optional[
Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]
] = None,
output_columns: Optional[List[str]] = None,
):
self.columns = columns
self.bins = bins
self.right = right
self.include_lowest = include_lowest
self.duplicates = duplicates
self.dtypes = dtypes
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
self._validate_on_fit()
Expand Down
80 changes: 80 additions & 0 deletions python/ray/data/tests/preprocessors/test_discretizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,46 @@ def test_uniform_kbins_discretizer(
# Check that the remaining column was not modified
assert out_df["C"].equals(in_df["C"])

# append mode
expected_message = "The length of columns and output_columns must match."
with pytest.raises(ValueError, match=expected_message):
UniformKBinsDiscretizer(["A", "B"], bins=bins, output_columns=["A_discretized"])

discretizer = UniformKBinsDiscretizer(
["A", "B"],
bins=bins,
dtypes=dtypes,
right=right,
include_lowest=include_lowest,
output_columns=["A_discretized", "B_discretized"],
)

transformed = discretizer.fit_transform(ds)
out_df = transformed.to_pandas()

assert out_df["A_discretized"].equals(
pd.cut(
in_df["A"],
bins_A,
labels=labels_A,
ordered=ordered_A,
right=right,
include_lowest=include_lowest,
)
)
assert out_df["B_discretized"].equals(
pd.cut(
in_df["B"],
bins_B,
labels=labels_B,
ordered=ordered_B,
right=right,
include_lowest=include_lowest,
)
)
# Check that the remaining column was not modified
assert out_df["C"].equals(in_df["C"])


@pytest.mark.parametrize(
"bins", ([3, 4, 6, 9], {"A": [3, 4, 6, 8, 9], "B": [3, 4, 6, 9]})
Expand Down Expand Up @@ -154,6 +194,46 @@ def test_custom_kbins_discretizer(
# Check that the remaining column was not modified
assert out_df["C"].equals(in_df["C"])

# append mode
expected_message = "The length of columns and output_columns must match."
with pytest.raises(ValueError, match=expected_message):
CustomKBinsDiscretizer(["A", "B"], bins=bins, output_columns=["A_discretized"])

discretizer = CustomKBinsDiscretizer(
["A", "B"],
bins=bins,
dtypes=dtypes,
right=right,
include_lowest=include_lowest,
output_columns=["A_discretized", "B_discretized"],
)

transformed = discretizer.fit_transform(ds)
out_df = transformed.to_pandas()

assert out_df["A_discretized"].equals(
pd.cut(
in_df["A"],
bins_A,
labels=labels_A,
ordered=ordered_A,
right=right,
include_lowest=include_lowest,
)
)
assert out_df["B_discretized"].equals(
pd.cut(
in_df["B"],
bins_B,
labels=labels_B,
ordered=ordered_B,
right=right,
include_lowest=include_lowest,
)
)
# Check that the remaining column was not modified
assert out_df["C"].equals(in_df["C"])


if __name__ == "__main__":
import sys
Expand Down

0 comments on commit 16112f5

Please sign in to comment.