Skip to content

Commit

Permalink
Updates Models to support new dataloader format for lists (__values a…
Browse files Browse the repository at this point in the history
…nd __offsets in dict) and scalar (1D) (#999)

* Updates to support new dataloader format for lists and scalar

* Updates on MM to make it support new dataloader output

* Centralizing PrepareFeatures call and fixing tests and API

* Fixed tests that predict last item of the sequence

* Fixed additional tests

* Fixed many tests

* Fixed InBatchNegative tests

* Updating TOX to point to dataloader changes PR and minor fix

* Updated transformers example to fix test

* Fixed tests

* Fixed nested loader training

* Turning Candidate into dataclass and adding 2D ids to it (batch size,1) instead of 1D as before

* Updated transformer block in test

* Updating gpu-ci.yaml to be able to run CI on an edited PR

* Removing the edited pull request option from gpu-ci.yaml, to see if it is necessary

* Fixed test and linting issue

* Changing the order of libraries installing to try and ensuring that the right dataloader is installed

* Changing GitHub Action to install a modified dataloader after installing models. Fixed linting

* Fixed unit test and linting issue

* Updating tox.ini to install the modified the dataloader for horovod and GPU tests

* Updating tox.ini and fixing linting issue

* Fixed failing tests

* Replaced references from value_count to shape

* Fixed tests

* Trying to enforce the horovod multi-gpu tests to use the modified dataloader for CI

* Removed change in horovodrun command that was trying to enforce usage of the modified dataloader installed

* Removing dep install for horovod gpu tests, to try and make it using the modified dataloader

* Trying to enforce horovod GPU tests to use the installed modified dataloader

* Implemented suggestions from Oliver
  • Loading branch information
gabrielspmoreira authored Mar 15, 2023
1 parent ca18dff commit a5e392c
Show file tree
Hide file tree
Showing 53 changed files with 981 additions and 842 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/tensorflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ jobs:
fi
pip install "pandas>=1.2.0,<1.4.0dev0"
pip install "NVTabular@git+https://github.com/NVIDIA-Merlin/NVTabular.git@$branch"
pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch"
#pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch"
pip install merlin-dataloader@git+https://github.com/bschifferer/dataloader.git@change_output
pip install "merlin-core@git+https://github.com/NVIDIA-Merlin/core.git@$branch"
- name: Install dependencies
run: |
python -m pip install "tensorflow${{ matrix.tensorflow-version }}"
python -m pip install .[tensorflow-dev]
pip install merlin-dataloader@git+https://github.com/bschifferer/dataloader.git@change_output
- name: Build
run: |
python setup.py develop
Expand Down Expand Up @@ -98,12 +100,14 @@ jobs:
fi
pip install "pandas>=1.2.0,<1.4.0dev0"
pip install "NVTabular@git+https://github.com/NVIDIA-Merlin/NVTabular.git@$branch"
pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch"
#pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch"
pip install merlin-dataloader@git+https://github.com/bschifferer/dataloader.git@change_output
pip install "merlin-core@git+https://github.com/NVIDIA-Merlin/core.git@$branch"
- name: Install dependencies
run: |
python -m pip install "tensorflow${{ matrix.tensorflow-version }}"
python -m pip install .[tensorflow-dev]
pip install merlin-dataloader@git+https://github.com/bschifferer/dataloader.git@change_output
- name: Build
run: |
python setup.py develop
Expand Down
5 changes: 2 additions & 3 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ Transformation Block Constructors

CategoryEncoding
MapValues
ListToDense
ListToRagged
ListToSparse
PrepareListFeatures
PrepareFeatures
ToSparse
ToDense
ToTarget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@
}
],
"source": [
"batch = mm.sample_batch(train, batch_size=128, include_targets=False, to_ragged=True)"
"batch = mm.sample_batch(train, batch_size=128, include_targets=False, prepare_features=True)"
]
},
{
Expand Down Expand Up @@ -2093,7 +2093,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3.8.10 ('merlin_22.07_dev')",
"language": "python",
"name": "python3"
},
Expand All @@ -2111,7 +2111,7 @@
},
"vscode": {
"interpreter": {
"hash": "ab403bb43341787581f43b51cdd291d61392c89ddb0f92179de653921d4e05db"
"hash": "67b01b24cb2518309f0749863665ff82dad1ad60adc88cabbb59c99b73117545"
}
}
},
Expand Down
21 changes: 12 additions & 9 deletions examples/usecases/transformers-next-item-prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,11 @@
" >> ops.Rename(name=\"weekday_checkout\")\n",
")\n",
"\n",
"categorical_features = (['city_id', 'booker_country', 'hotel_country'] +\n",
" weekday_checkin + weekday_checkout\n",
" ) >> ops.Categorify(start_index=1) \n",
"\n",
"groupby_features = ['city_id', 'booker_country', 'utrip_id', 'hotel_country', 'checkin'] + weekday_checkin + weekday_checkout >> ops.Groupby(\n",
"groupby_features = categorical_features + ['utrip_id', 'checkin'] >> ops.Groupby(\n",
" groupby_cols=['utrip_id'],\n",
" aggs={\n",
" 'city_id': ['list', 'count'],\n",
Expand All @@ -332,16 +335,16 @@
" sort_cols=\"checkin\"\n",
")\n",
"\n",
"groupby_features_city = groupby_features['city_id_list'] >> ops.Categorify() >> ops.AddTags([Tags.SEQUENCE, Tags.ITEM, Tags.ITEM_ID])\n",
"groupby_features_country = (\n",
" groupby_features['booker_country_list', 'hotel_country_list', 'weekday_checkin_list', 'weekday_checkout_list']\n",
" >> ops.Categorify() >> ops.AddTags([Tags.SEQUENCE, Tags.ITEM])\n",
"list_features = (\n",
" groupby_features['city_id_list', 'booker_country_list', 'hotel_country_list', \n",
" 'weekday_checkin_list', 'weekday_checkout_list'\n",
" ] >> ops.AddTags([Tags.SEQUENCE])\n",
")\n",
"city_id_count = groupby_features['city_id_count'] >> ops.AddTags([Tags.CONTEXT, Tags.ITEM, Tags.CONTINUOUS])\n",
"\n",
"# Filter out sessions with less than 2 interactions \n",
"MINIMUM_SESSION_LENGTH = 2\n",
"filtered_sessions = groupby_features_city + groupby_features_country + city_id_count >> ops.Filter(f=lambda df: df[\"city_id_count\"] >= MINIMUM_SESSION_LENGTH) "
"features = list_features + (groupby_features['city_id_count'] >> ops.AddTags([Tags.CONTINUOUS]))\n",
"filtered_sessions = features >> ops.Filter(f=lambda df: df[\"city_id_count\"] >= MINIMUM_SESSION_LENGTH) "
]
},
{
Expand Down Expand Up @@ -806,7 +809,7 @@
"text": [
"/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].\n",
" warnings.warn(\n",
"/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initalizer instance more than once.\n",
"/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initializer instance more than once.\n",
" warnings.warn(\n",
"2023-02-08 13:17:18.083919: I tensorflow/stream_executor/cuda/cuda_blas.cc:1633] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.\n",
"2023-02-08 13:17:18.254522: I tensorflow/stream_executor/cuda/cuda_dnn.cc:424] Loaded cuDNN version 8700\n"
Expand Down Expand Up @@ -957,7 +960,7 @@
"model.evaluate(\n",
" validation_set_processed,\n",
" batch_size=128,\n",
" pre=mm.SequenceMaskLast(schema=validation_set_processed.schema, target=target),\n",
" pre=mm.SequenceMaskLast(schema=seq_schema, target=target),\n",
" return_dict=True\n",
")"
]
Expand Down
3 changes: 1 addition & 2 deletions merlin/datasets/entertainment/movielens/1m/schema.pbtxt
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ feature {
feature {
name: "genres"
value_count {
min: 1
max: 6
min: 1
}
type: INT
int_domain {
Expand Down
7 changes: 2 additions & 5 deletions merlin/datasets/entertainment/music_streaming/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
},
"annotation": {
"tag": [
"categorical",
"session_id"
]
}
Expand Down Expand Up @@ -63,8 +62,7 @@
{
"name": "item_genres",
"valueCount": {
"min": "1",
"max": "20"
"min": "1"
},
"type": "INT",
"intDomain": {
Expand Down Expand Up @@ -127,8 +125,7 @@
{
"name": "user_genres",
"valueCount": {
"min": "1",
"max": "20"
"min": "1"
},
"type": "INT",
"intDomain": {
Expand Down
75 changes: 25 additions & 50 deletions merlin/datasets/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import pathlib
from pathlib import Path
from random import randint
from typing import Dict, Optional, Sequence, Tuple, Union
from typing import Dict, Sequence, Tuple, Union

import numpy as np

import merlin.io
from merlin.models.utils import schema_utils
from merlin.schema import ColumnSchema, Schema, Tags
from merlin.schema import Schema, Tags
from merlin.schema.io.tensorflow_metadata import TensorflowMetadata

LOG = logging.getLogger("merlin-models")
Expand Down Expand Up @@ -92,9 +92,13 @@ def generate_data(
Example::
train, valid = generate_data(input, 10000, (0.8, 0.2))
min_session_length: int
The minimum number of events in a session.
The minimum number of events in a session. Overrides the
min sequence length information from the shape of list columns
schema (schema[col].shape.dims[1].min)
max_session_length: int
The maximum number of events in a session.
The minimum number of events in a session. Overrides the
max sequence length information from the shape of list columns
schema (schema[col].shape.dims[1].max)
device: str
The device to use for the data generation.
Supported values: {'cpu', 'gpu'}
Expand All @@ -119,23 +123,15 @@ def generate_data(
raise ValueError(f"Unknown input type: {type(input)}")

for col in schema.column_names:
if not schema[col].is_list:
continue
new_properties = schema[col].properties
new_properties["value_count"] = {"min": min_session_length}
if max_session_length:
new_properties["value_count"]["max"] = max_session_length
schema[col] = ColumnSchema(
name=schema[col].name,
tags=schema[col].tags,
properties=new_properties,
dtype=schema[col].dtype,
is_list=True,
)
if schema[col].shape.is_list:
min_session_length = min_session_length or schema[col].shape.dims[1].min
max_session_length = max_session_length or schema[col].shape.dims[1].max
# Overriding min and max session length from schema
schema[col] = schema[col].with_shape(
((0, None), (min_session_length, max_session_length))
)

df = generate_user_item_interactions(
schema, num_rows, min_session_length, max_session_length, device=device
)
df = generate_user_item_interactions(schema, num_rows, device=device)

if list(set_sizes) != [1.0]:
num_rows = df.shape[0]
Expand All @@ -156,8 +152,6 @@ def generate_data(
def generate_user_item_interactions(
schema: Schema,
num_interactions: int,
min_session_length: int = 5,
max_session_length: Optional[int] = None,
device: str = "cpu",
):
"""
Expand All @@ -177,10 +171,6 @@ def generate_user_item_interactions(
schema object describing the columns to generate.
num_interactions: int
number of interaction rows to generate.
max_session_length: Optional[int]
The maximum length of the multi-hot/sequence features
min_session_length: int
The minimum length of the multi-hot/sequence features
device: str
device to use for generating data.
Expand Down Expand Up @@ -215,8 +205,6 @@ def generate_user_item_interactions(
data,
features,
session_id_col,
min_session_length=min_session_length,
max_session_length=max_session_length,
device=device,
)
processed_cols += [f.name for f in features] + [session_id_col.name]
Expand All @@ -235,8 +223,6 @@ def generate_user_item_interactions(
data,
features,
user_id_col,
min_session_length=min_session_length,
max_session_length=max_session_length,
device=device,
)
processed_cols += [f.name for f in features] + [user_id_col.name]
Expand All @@ -247,11 +233,12 @@ def generate_user_item_interactions(
raise ValueError("Item ID column is required")
item_id_col = item_schema.first

is_list_feature = item_id_col.is_list
is_list_feature = item_id_col.shape.is_list
if not is_list_feature:
shape = num_interactions
else:
shape = (num_interactions, max_session_length or min_session_length) # type: ignore
seq_length = item_id_col.shape.dims[1].max or item_id_col.shape.dims[1].min
shape = (num_interactions, seq_length) # type: ignore
tmp = _array.clip(
_array.random.lognormal(3.0, 1.0, shape).astype(_array.int32),
1,
Expand All @@ -262,14 +249,7 @@ def generate_user_item_interactions(
else:
data[item_id_col.name] = list(tmp)
features = list(schema.select_by_tag(Tags.ITEM).remove_by_tag(Tags.ITEM_ID))
data = generate_conditional_features(
data,
features,
item_id_col,
min_session_length=min_session_length,
max_session_length=max_session_length,
device=device,
)
data = generate_conditional_features(data, features, item_id_col, device=device)
processed_cols += [f.name for f in features] + [item_id_col.name]

# Get remaining features
Expand All @@ -284,9 +264,7 @@ def generate_user_item_interactions(
is_int_feature = feature.dtype and np.issubdtype(feature.dtype.to_numpy, np.integer)
is_list_feature = feature.is_list
if is_list_feature:
data[feature.name] = generate_random_list_feature(
feature, num_interactions, min_session_length, max_session_length, device
)
data[feature.name] = generate_random_list_feature(feature, num_interactions, device)

elif is_int_feature:
domain = feature.int_domain
Expand All @@ -311,8 +289,6 @@ def generate_conditional_features(
data,
features,
parent_feature,
min_session_length: int = 5,
max_session_length: Optional[int] = None,
device="cpu",
):
"""
Expand All @@ -331,9 +307,7 @@ def generate_conditional_features(
is_list_feature = feature.is_list

if is_list_feature:
data[feature.name] = generate_random_list_feature(
feature, num_interactions, min_session_length, max_session_length, device
)
data[feature.name] = generate_random_list_feature(feature, num_interactions, device)

elif is_int_feature:
if not feature.int_domain:
Expand Down Expand Up @@ -364,15 +338,16 @@ def generate_conditional_features(
def generate_random_list_feature(
feature,
num_interactions,
min_session_length: int = 5,
max_session_length: Optional[int] = None,
device="cpu",
):
if device == "cpu":
import numpy as _array
else:
import cupy as _array

seq_length_dim = feature.shape.dims[1]
min_session_length, max_session_length = seq_length_dim.min, seq_length_dim.max

is_int_feature = np.issubdtype(feature.dtype.to_numpy, np.integer)
if is_int_feature:
if max_session_length:
Expand Down
Loading

0 comments on commit a5e392c

Please sign in to comment.