Skip to content

Commit

Permalink
feat: allow encoders to be used in append mode
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
  • Loading branch information
martinbomio committed Feb 7, 2025
1 parent 6037e33 commit a65073d
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 51 deletions.
18 changes: 17 additions & 1 deletion python/ray/data/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pickle
import warnings
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Union
from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional

from ray.air.util.data_batch_conversion import BatchFormat
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -277,6 +277,22 @@ def _transform_batch(self, data: "DataBatchType") -> "DataBatchType":
elif transform_type == BatchFormat.NUMPY:
return self._transform_numpy(_convert_batch_type_to_numpy(data))

def _derive_and_validate_output_columns(
self, columns: List[str], output_columns: Optional[List[str]]
) -> List[str]:
"""
Returns the output columns, checking if they are explicitely set, otherwise defaulting to
the input columns. Throws an error when the length of the output columns does not match the
length of the input columns.
"""

if output_columns and len(columns) != len(output_columns):
raise ValueError(
"Invalid output_columns: Got len(columns) != len(output_columns)."
"The length of columns and output_columns must match."
)
return output_columns or columns

@DeveloperAPI
def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame":
"""Run the transformation on a data batch in a Pandas DataFrame format."""
Expand Down
156 changes: 132 additions & 24 deletions python/ray/data/preprocessors/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ class OrdinalEncoder(Preprocessor):
2 1 0
3 0 1
:class:`OrdinalEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OrdinalEncoder(columns=["sex", "level"], output_columns=["sex_encoded", "level_encoded"])
>>> encoder.fit_transform(ds).to_pandas()
sex level sex_encoded level_encoded
0 male L4 1 1
1 female L5 0 2
2 male L3 1 0
3 female L4 0 1
If you transform a value not present in the original dataset, then the value
is encoded as ``float("nan")``.
Expand Down Expand Up @@ -76,17 +88,30 @@ class OrdinalEncoder(Preprocessor):
encode_lists: If ``True``, encode list elements. If ``False``, encode
whole lists (i.e., replace each list with an integer). ``True``
by default.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`OneHotEncoder`
Another preprocessor that encodes categorical data.
"""

def __init__(self, columns: List[str], *, encode_lists: bool = True):
def __init__(
self,
columns: List[str],
*,
encode_lists: bool = True,
output_columns: Optional[List[str]] = None,
):
# TODO: allow user to specify order of values within each column.
self.columns = columns
self.encode_lists = encode_lists
self.output_columns = self._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
self.stats_ = _get_unique_value_indices(
Expand Down Expand Up @@ -116,13 +141,14 @@ def list_as_category(element):
s_values = self.stats_[f"unique_values({s.name})"]
return s.map(s_values)

df[self.columns] = df[self.columns].apply(column_ordinal_encoder)
df[self.output_columns] = df[self.columns].apply(column_ordinal_encoder)
return df

def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"encode_lists={self.encode_lists!r})"
f"encode_lists={self.encode_lists!r}, "
f"output_columns={self.output_columns!r})"
)


Expand Down Expand Up @@ -153,13 +179,26 @@ class OneHotEncoder(Preprocessor):
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OneHotEncoder(columns=["color"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color_blue color_green color_red
0 0 0 1
1 0 1 0
2 0 0 1
3 0 0 1
4 1 0 0
5 0 1 0
color
0 [0, 0, 1]
1 [0, 1, 0]
2 [0, 0, 1]
3 [0, 0, 1]
4 [1, 0, 0]
5 [0, 1, 0]
:class:`MultiHotEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OneHotEncoder(columns=["color"], output_columns=["color_encoded"])
>>> encoder.fit_transform(ds).to_pandas()
color color_encoded
0 red [0, 0, 1]
1 green [0, 1, 0]
2 red [0, 0, 1]
3 red [0, 0, 1]
4 blue [1, 0, 0]
5 green [0, 1, 0]
If you one-hot encode a value that isn't in the fitted dataset, then the
value is encoded with zeros.
Expand Down Expand Up @@ -188,6 +227,10 @@ class OneHotEncoder(Preprocessor):
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
Expand All @@ -201,11 +244,18 @@ class OneHotEncoder(Preprocessor):
""" # noqa: E501

def __init__(
self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories
self.output_columns = self._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
self.stats_ = _get_unique_value_indices(
Expand All @@ -220,7 +270,7 @@ def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)

# Compute new one-hot encoded columns
for column in self.columns:
for column, output_column in zip(self.columns, self.output_columns):
column_values = self.stats_[f"unique_values({column})"]
if _is_series_composed_of_lists(df[column]):
df[column] = df[column].map(lambda x: tuple(x))
Expand All @@ -236,13 +286,14 @@ def _transform_pandas(self, df: pd.DataFrame):
df = df.drop(columns=value_columns)
# Use a Pandas Series for column assignment to get more consistent
# behavior across Pandas versions.
df.loc[:, column] = pd.Series(list(concatenated))
df.loc[:, output_column] = pd.Series(list(concatenated))
return df

def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r})"
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns!r})"
)


Expand Down Expand Up @@ -285,6 +336,16 @@ class MultiHotEncoder(Preprocessor):
1 Moana [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [0, 0, 0, 1, 0]
:class:`MultiHotEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = MultiHotEncoder(columns=["genre"], output_columns=["genre_encoded"])
>>> encoder.fit_transform(ds).to_pandas()
name genre genre_encoded
0 Shaolin Soccer [comedy, action, sports] [1, 0, 1, 0, 1]
1 Moana [animation, comedy, action] [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [documentary] [0, 0, 0, 1, 0]
If you specify ``max_categories``, then :class:`MultiHotEncoder`
creates features for only the most frequent categories.
Expand All @@ -302,6 +363,10 @@ class MultiHotEncoder(Preprocessor):
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every unique category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
Expand All @@ -315,11 +380,18 @@ class MultiHotEncoder(Preprocessor):
"""

def __init__(
self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories
self.output_columns = self._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
self.stats_ = _get_unique_value_indices(
Expand All @@ -342,15 +414,16 @@ def encode_list(element: list, *, name: str):
counter = Counter(element)
return [counter.get(x, 0) for x in stats]

for column in self.columns:
df[column] = df[column].map(partial(encode_list, name=column))
for column, output_column in zip(self.columns, self.output_columns):
df[output_column] = df[column].map(partial(encode_list, name=column))

return df

def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r})"
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns})"
)


Expand Down Expand Up @@ -383,6 +456,17 @@ class LabelEncoder(Preprocessor):
2 4.9 3.0 0
3 6.2 3.4 2
You can also provide the name of the output column that should hold the encoded
labels if you want to use :class:`LabelEncoder` in append mode.
>>> encoder = LabelEncoder(label_column="species", output_column="species_encoded")
>>> encoder.fit_transform(ds).to_pandas()
sepal_width sepal_height species species_encoded
0 5.1 3.5 setosa 0
1 7.0 3.2 versicolor 1
2 4.9 3.0 setosa 0
3 6.2 3.4 virginica 2
If you transform a label not present in the original dataset, then the new
label is encoded as ``float("nan")``.
Expand All @@ -398,6 +482,9 @@ class LabelEncoder(Preprocessor):
Args:
label_column: A column containing labels that you want to encode.
output_column: The name of the column that will contain the encoded
labels. If None, the output column will have the same name as the
input column.
.. seealso::
Expand All @@ -406,8 +493,9 @@ class LabelEncoder(Preprocessor):
:class:`LabelEncoder`.
"""

def __init__(self, label_column: str):
def __init__(self, label_column: str, *, output_column: Optional[str] = None):
self.label_column = label_column
self.output_column = output_column or label_column

def _fit(self, dataset: Dataset) -> Preprocessor:
self.stats_ = _get_unique_value_indices(dataset, [self.label_column])
Expand All @@ -420,7 +508,7 @@ def column_label_encoder(s: pd.Series):
s_values = self.stats_[f"unique_values({s.name})"]
return s.map(s_values)

df[self.label_column] = df[self.label_column].transform(column_label_encoder)
df[self.output_column] = df[self.label_column].transform(column_label_encoder)
return df

def inverse_transform(self, ds: "Dataset") -> "Dataset":
Expand Down Expand Up @@ -462,11 +550,11 @@ def column_label_decoder(s: pd.Series):
}
return s.map(inverse_values)

df[self.label_column] = df[self.label_column].transform(column_label_decoder)
df[self.label_column] = df[self.output_column].transform(column_label_decoder)
return df

def __repr__(self):
return f"{self.__class__.__name__}(label_column={self.label_column!r})"
return f"{self.__class__.__name__}(label_column={self.label_column!r}, output_column={self.output_column!r})"


@PublicAPI(stability="alpha")
Expand Down Expand Up @@ -497,6 +585,17 @@ class Categorizer(Preprocessor):
>>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]
:class:`Categorizer` can also be used in append mode by providing the
name of the output_columns that should hold the categorized values.
>>> categorizer = Categorizer(columns=["sex", "level"], output_columns=["sex_cat", "level_cat"])
>>> categorizer.fit_transform(ds).to_pandas()
sex level sex_cat level_cat
0 male L4 male L4
1 female L5 female L5
2 male L3 male L3
3 female L4 female L4
If you know the categories in advance, you can specify the categories with the
``dtypes`` parameter.
Expand All @@ -512,18 +611,27 @@ class Categorizer(Preprocessor):
dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
objects. If you don't include a column in ``dtypes``, the categories
are inferred.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
""" # noqa: E501

def __init__(
self,
columns: List[str],
dtypes: Optional[Dict[str, pd.CategoricalDtype]] = None,
output_columns: Optional[List[str]] = None,
):
if not dtypes:
dtypes = {}

self.columns = columns
self.dtypes = dtypes
self.output_columns = self._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
columns_to_get = [
Expand All @@ -544,13 +652,13 @@ def _fit(self, dataset: Dataset) -> Preprocessor:
return self

def _transform_pandas(self, df: pd.DataFrame):
df = df.astype(self.stats_)
df[self.output_columns] = df[self.columns].astype(self.stats_)
return df

def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"dtypes={self.dtypes!r})"
f"dtypes={self.dtypes!r}, output_columns={self.output_columns!r})"
)


Expand Down
Loading

0 comments on commit a65073d

Please sign in to comment.