Skip to content

Commit

Permalink
[data/preprocessors] feat: allow vectorizers to be executed in append…
Browse files Browse the repository at this point in the history
… mode (#50847)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

This is part of #48133.
Continuing the approach taken in
#49426, make all the vectorizers
work in append mode.
Took similar approach to #50632
where the vectorizers also change to output a single column instead of
one column per token or per num_feature

## Related issue number

#49426

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Martin Bomio <martinbomio@spotify.com>
  • Loading branch information
martinbomio authored Feb 24, 2025
1 parent 77c95df commit fb5aee6
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 98 deletions.
129 changes: 87 additions & 42 deletions python/ray/data/preprocessors/vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ class HashingVectorizer(Preprocessor):
"""Count the frequency of tokens using the
`hashing trick <https://en.wikipedia.org/wiki/Feature_hashing>`_.
This preprocessors creates ``num_features`` columns named like
``hash_{column_name}_{index}``. If ``num_features`` is large enough relative to
the size of your vocabulary, then each column approximately corresponds to the
frequency of a unique token.
This preprocessors creates a list column for each input column. For each row,
the list contains the frequency counts of tokens (for CountVectorizer) or hash values
(for HashingVectorizer). For HashingVectorizer, the list will have length
``num_features``. If ``num_features`` is large enough relative to the size of your
vocabulary, then each index approximately corresponds to the frequency of a unique
token.
:class:`HashingVectorizer` is memory efficient and quick to pickle. However, given a
transformed column, you can't know which tokens correspond to it. This might make it
Expand Down Expand Up @@ -74,10 +76,20 @@ class HashingVectorizer(Preprocessor):
>>>
>>> vectorizer = HashingVectorizer(["corpus"], num_features=8)
>>> vectorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
hash_corpus_0 hash_corpus_1 hash_corpus_2 hash_corpus_3 hash_corpus_4 hash_corpus_5 hash_corpus_6 hash_corpus_7
0 1 0 1 0 0 0 0 1
1 1 0 1 0 0 0 1 1
2 0 0 1 1 0 2 1 0
corpus
0 [1, 0, 1, 0, 0, 0, 0, 1]
1 [1, 0, 1, 0, 0, 0, 1, 1]
2 [0, 0, 1, 1, 0, 2, 1, 0]
:class:`HashingVectorizer` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> vectorizer = HashingVectorizer(["corpus"], num_features=8, output_columns=["corpus_hashed"])
>>> vectorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
corpus corpus_hashed
0 Jimmy likes volleyball [1, 0, 1, 0, 0, 0, 0, 1]
1 Bob likes volleyball too [1, 0, 1, 0, 0, 0, 1, 1]
2 Bob also likes fruit jerky [0, 0, 1, 1, 0, 2, 1, 0]
Args:
columns: The columns to separately tokenize and count.
Expand All @@ -88,6 +100,10 @@ class HashingVectorizer(Preprocessor):
should accept a string as input and return a list of tokens as
output. If unspecified, the tokenizer uses a function equivalent to
``lambda s: s.split(" ")``.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
Expand All @@ -109,36 +125,41 @@ def __init__(
columns: List[str],
num_features: int,
tokenization_fn: Optional[Callable[[str], List[str]]] = None,
*,
output_columns: Optional[List[str]] = None,
):
self.columns = columns
# TODO(matt): Set default number of features.
# This likely requires sparse matrix support to avoid explosion of columns.
self.num_features = num_features
# TODO(matt): Add a more robust default tokenizer.
self.tokenization_fn = tokenization_fn or simple_split_tokenizer
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)

def _transform_pandas(self, df: pd.DataFrame):
# TODO(matt): Use sparse matrix for efficiency.

def hash_count(tokens: List[str]) -> Counter:
hashed_tokens = [simple_hash(token, self.num_features) for token in tokens]
return Counter(hashed_tokens)

for col in self.columns:
for col, output_col in zip(self.columns, self.output_columns):
tokenized = df[col].map(self.tokenization_fn)
hashed = tokenized.map(hash_count)
# Create a list to store the hash columns
hash_columns = []
for i in range(self.num_features):
df[f"hash_{col}_{i}"] = hashed.map(lambda counts: counts[i])
series = hashed.map(lambda counts: counts[i])
series.name = f"hash_{i}"
hash_columns.append(series)
# Concatenate all hash columns into a single list column
df[output_col] = pd.concat(hash_columns, axis=1).values.tolist()

# Drop original columns.
df.drop(columns=self.columns, inplace=True)
return df

def __repr__(self):
fn_name = getattr(self.tokenization_fn, "__name__", self.tokenization_fn)
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"num_features={self.num_features!r}, tokenization_fn={fn_name})"
f"num_features={self.num_features!r}, tokenization_fn={fn_name}, "
f"output_columns={self.output_columns!r})"
)


Expand All @@ -154,15 +175,14 @@ class CountVectorizer(Preprocessor):
0 I dislike Python
1 I like Python
This preprocessors creates a column named like ``{column}_{token}`` for each
unique token. These columns represent the frequency of token ``{token}`` in
column ``{column}``. For example:
This preprocessor creates a list column for each input column. Each list contains
the frequency counts of tokens in order of their first appearance. For example:
.. code-block::
corpus_I corpus_Python corpus_dislike corpus_like
0 1 1 1 0
1 1 1 0 1
corpus
0 [1, 1, 1, 0] # Counts for [I, dislike, Python, like]
1 [1, 0, 1, 1] # Counts for [I, dislike, Python, like]
Examples:
>>> import pandas as pd
Expand All @@ -180,19 +200,29 @@ class CountVectorizer(Preprocessor):
>>>
>>> vectorizer = CountVectorizer(["corpus"])
>>> vectorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
corpus_likes corpus_volleyball corpus_Bob corpus_Jimmy corpus_too corpus_also corpus_fruit corpus_jerky
0 1 1 0 1 0 0 0 0
1 1 1 1 0 1 0 0 0
2 1 0 1 0 0 1 1 1
corpus
0 [1, 0, 1, 1, 0, 0, 0, 0]
1 [1, 1, 1, 0, 0, 0, 0, 1]
2 [1, 1, 0, 0, 1, 1, 1, 0]
You can limit the number of tokens in the vocabulary with ``max_features``.
>>> vectorizer = CountVectorizer(["corpus"], max_features=3)
>>> vectorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
corpus_likes corpus_volleyball corpus_Bob
0 1 1 0
1 1 1 1
2 1 0 1
corpus
0 [1, 0, 1]
1 [1, 1, 1]
2 [1, 1, 0]
:class:`CountVectorizer` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> vectorizer = CountVectorizer(["corpus"], output_columns=["corpus_counts"])
>>> vectorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
corpus corpus_counts
0 Jimmy likes volleyball [1, 0, 1, 1, 0, 0, 0, 0]
1 Bob likes volleyball too [1, 1, 1, 0, 0, 0, 0, 1]
2 Bob also likes fruit jerky [1, 1, 0, 0, 1, 1, 1, 0]
Args:
columns: The columns to separately tokenize and count.
Expand All @@ -202,20 +232,26 @@ class CountVectorizer(Preprocessor):
``lambda s: s.split(" ")``.
max_features: The maximum number of tokens to encode in the transformed
dataset. If specified, only the most frequent tokens are encoded.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
""" # noqa: E501

def __init__(
self,
columns: List[str],
tokenization_fn: Optional[Callable[[str], List[str]]] = None,
max_features: Optional[int] = None,
*,
output_columns: Optional[List[str]] = None,
):
# TODO(matt): Add fit_transform to avoid recomputing tokenization step.
self.columns = columns
# TODO(matt): Add a more robust default tokenizer.
self.tokenization_fn = tokenization_fn or simple_split_tokenizer
self.max_features = max_features
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
def get_pd_value_counts(df: pd.DataFrame) -> List[Counter]:
Expand Down Expand Up @@ -248,23 +284,32 @@ def most_common(counter: Counter, n: int):
return self

def _transform_pandas(self, df: pd.DataFrame):

to_concat = []
for col in self.columns:
result_columns = []
for col, output_col in zip(self.columns, self.output_columns):
token_counts = self.stats_[f"token_counts({col})"]
sorted_tokens = [token for (token, count) in token_counts.most_common()]
tokenized = df[col].map(self.tokenization_fn).map(Counter)

# Create a list to store token frequencies
token_columns = []
for token in sorted_tokens:
series = tokenized.map(lambda val: val[token])
series.name = f"{col}_{token}"
to_concat.append(series)
series.name = token
token_columns.append(series)

# Concatenate all token columns into a single list column
if token_columns:
df[output_col] = pd.concat(token_columns, axis=1).values.tolist()
else:
df[output_col] = [[]] * len(df)
result_columns.append(output_col)

df = pd.concat(to_concat, axis=1)
return df

def __repr__(self):
fn_name = getattr(self.tokenization_fn, "__name__", self.tokenization_fn)
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"tokenization_fn={fn_name}, max_features={self.max_features!r})"
f"tokenization_fn={fn_name}, max_features={self.max_features!r}, "
f"output_columns={self.output_columns!r})"
)
36 changes: 1 addition & 35 deletions python/ray/data/tests/preprocessors/test_hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

import ray
from ray.data.preprocessors import FeatureHasher, HashingVectorizer
from ray.data.preprocessors import FeatureHasher


def test_feature_hasher():
Expand Down Expand Up @@ -32,40 +32,6 @@ def test_feature_hasher():
assert all(document_term_matrix.iloc[1] <= 1)


def test_hashing_vectorizer():
"""Tests basic HashingVectorizer functionality."""

col_a = ["a b b c c c", "a a a a c"]
col_b = ["apple", "banana banana banana"]
in_df = pd.DataFrame.from_dict({"A": col_a, "B": col_b})
ds = ray.data.from_pandas(in_df)

vectorizer = HashingVectorizer(["A", "B"], num_features=3)

transformed = vectorizer.transform(ds)
out_df = transformed.to_pandas()

processed_col_a_0 = [2, 0]
processed_col_a_1 = [1, 4]
processed_col_a_2 = [3, 1]
processed_col_b_0 = [1, 0]
processed_col_b_1 = [0, 3]
processed_col_b_2 = [0, 0]

expected_df = pd.DataFrame.from_dict(
{
"hash_A_0": processed_col_a_0,
"hash_A_1": processed_col_a_1,
"hash_A_2": processed_col_a_2,
"hash_B_0": processed_col_b_0,
"hash_B_1": processed_col_b_1,
"hash_B_2": processed_col_b_2,
}
)

assert out_df.equals(expected_df)


if __name__ == "__main__":
import sys

Expand Down
Loading

0 comments on commit fb5aee6

Please sign in to comment.