diff --git a/merlin/loader/loader_base.py b/merlin/loader/loader_base.py index f61c3e9c..1d829853 100644 --- a/merlin/loader/loader_base.py +++ b/merlin/loader/loader_base.py @@ -19,6 +19,7 @@ import threading import warnings from collections import OrderedDict +from typing import List import numpy as np @@ -36,6 +37,8 @@ make_df, pull_apart_list, ) +from merlin.dag import BaseOperator, ColumnSelector, DictArray, Graph, Node +from merlin.dag.executors import LocalExecutor from merlin.io import shuffle_df from merlin.schema import Tags @@ -59,6 +62,7 @@ def __init__( global_size=None, global_rank=None, drop_last=False, + transforms=None, ): self.dataset = dataset self.batch_size = batch_size @@ -79,6 +83,7 @@ def __init__( ) dataset.schema = dataset.infer_schema() + self.schema = dataset.schema self.sparse_names = [] self.sparse_max = {} self.sparse_as_dense = set() @@ -126,6 +131,30 @@ def __init__( self._batch_itr = None self._workers = None + if transforms is not None: + + if isinstance(transforms, List): + carry_node = Node(ColumnSelector("*")) + for transform in transforms: + # check that each transform is an operator: + if not isinstance(transform, BaseOperator): + raise TypeError(f"Detected invalid transform, {type(transform)}") + carry_node = carry_node >> transform + transform_graph = Graph(carry_node) + elif type(transforms, Graph): + transform_graph = transforms + self.transforms = transform_graph.construct_schema(self.schema).output_node + self.schema = self.transforms.output_schema + # should we make one main local executor and hold that on dataloader? + # Or build dynamically per batch? + # is there a reason we might expose this to the user? + # change to something other than local? + self.executor = LocalExecutor() + else: + # Like this to be more explicit about what occurs. + self.transforms = None + self.executor = None + @property def _buff(self): if self.__buff is None: @@ -551,8 +580,35 @@ def _handle_tensors(self, tensors): labels = None if len(self.label_names) > 0: labels = X.pop(self.label_names[0]) + + # with tensors all in one dictionary + # apply transforms graph here against the tensors + # + # tensors = local_executor.transform_data(tensors) + + # bad thing here is that we dont have the labels, what is required, for some + # reason by op transform logic? + # bad thing here is that some of this entries are lists, which are tuples? + # are all operators going to need to know about lists as tuples? + # seems like we could benefit from an object here that encapsulates + # both lists and scalar tensor types? + if self.transforms: + X = self.executor.transform(DictArray(X), [self.transforms]) + return X, labels + def _pack(self, gdf): + if isinstance(gdf, np.ndarray): + return gdf + elif hasattr(gdf, "to_dlpack") and callable(getattr(gdf, "to_dlpack")): + return gdf.to_dlpack() + elif hasattr(gdf, "to_numpy") and callable(getattr(gdf, "to_numpy")): + gdf = gdf.to_numpy() + if isinstance(gdf[0], list): + gdf = np.stack(gdf) + return gdf + return gdf.toDlpack() + class ChunkQueue: """This class takes partitions (parts) from an merlin.io.Dataset diff --git a/merlin/loader/ops/__init__.py b/merlin/loader/ops/__init__.py new file mode 100644 index 00000000..5d9909de --- /dev/null +++ b/merlin/loader/ops/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/merlin/loader/ops/embeddings/__init__.py b/merlin/loader/ops/embeddings/__init__.py new file mode 100644 index 00000000..04b26169 --- /dev/null +++ b/merlin/loader/ops/embeddings/__init__.py @@ -0,0 +1,27 @@ +# +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# flake8: noqa +from merlin.loader.ops.embeddings.tf_embedding_op import ( + Numpy_Mmap_TFEmbedding, + Numpy_TFEmbeddingOperator, + TFEmbeddingOperator, +) +from merlin.loader.ops.embeddings.torch_embedding_op import ( + Numpy_Mmap_TorchEmbedding, + Numpy_TorchEmbeddingOperator, + TorchEmbeddingOperator, +) diff --git a/merlin/loader/ops/embeddings/tf_embedding_op.py b/merlin/loader/ops/embeddings/tf_embedding_op.py new file mode 100644 index 00000000..1537f414 --- /dev/null +++ b/merlin/loader/ops/embeddings/tf_embedding_op.py @@ -0,0 +1,248 @@ +import numpy as np +import tensorflow as tf + +from merlin.core.protocols import Transformable +from merlin.dag import BaseOperator +from merlin.dag.selector import ColumnSelector +from merlin.schema import ColumnSchema, Schema, Tags + + +class TFEmbeddingOperator(BaseOperator): + """Create an operator that will apply a tf embedding table to supplied indices. + This operator allows the user to supply an id lookup table if the indices supplied + via the id_lookup_table. Embedding table is stored in host memory. + + Parameters + ---------- + embeddings : np.ndarray + numpy ndarray representing embedding values + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + id_lookup_table : np.array, optional + numpy array of values that represent embedding indices, by default None + """ + + def __init__( + self, + embeddings: np.ndarray, + lookup_key: str = "id", + embedding_name: str = "embeddings", + id_lookup_table=None, + ): + self.embeddings = ( + embeddings if isinstance(embeddings, tf.Tensor) else tf.convert_to_tensor(embeddings) + ) + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.id_lookup_table = id_lookup_table + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + indices = transformable[self.lookup_key] + if self.id_lookup_table: + indices = np.in1d(self.id_lookup_table, indices) + embeddings = tf.nn.embedding_lookup(self.embeddings, indices) + transformable[self.embedding_name] = embeddings + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.dtype.as_numpy_dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) + + +class Numpy_TFEmbeddingOperator(BaseOperator): + """Create an embedding table from supplied embeddings to add embedding entry + to records based on supplied indices. Support for indices lookup table is available. + Embedding table is stored in host memory. + + Parameters + ---------- + embeddings : np.ndarray + numpy ndarray representing embedding values + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + id_lookup_table : np.array, optional + numpy array of values that represent embedding indices, by default None + """ + + def __init__( + self, + embeddings: np.ndarray, + lookup_key: str = "id", + embedding_name: str = "embeddings", + id_lookup_table=None, + ): + self.embeddings = embeddings + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.id_lookup_table = id_lookup_table + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + indices = transformable[self.lookup_key] + if self.id_lookup_table: + indices = np.in1d(self.id_lookup_table, indices) + embeddings = self.embeddings[indices] + transformable[self.embedding_name] = tf.convert_to_tensor(embeddings) + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) + + +class Numpy_Mmap_TFEmbedding(BaseOperator): + """Operator loads numpy embedding table from file using memory map to be used to create + tensorflow embedding representations. This allows for larger than host memory embedding + tables to be used for embedding lookups. The only limit to the size is what fits in + storage, preferred storage device is SSD for faster lookups. + + Parameters + ---------- + embedding_npz : numpy ndarray file + file holding numpy ndarray representing embedding table + ids_lookup_npz : numpy array file, optional + file holding numpy array of values that represent embedding indices, by default None + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + transform_function : _type_, optional + function that will transform embedding from numpy to torch, by default None + """ + + def __init__( + self, + embedding_npz, + ids_lookup_npz=None, + lookup_key="id", + embedding_name="embeddings", + transform_function=None, + ): + self.embeddings = np.load(embedding_npz, mmap_mode="r") + self.id_lookup = np.load(ids_lookup_npz) if ids_lookup_npz else None + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.transform_function = tf.convert_to_tensor + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + ids_tensor = transformable[self.lookup_key] + if self.id_lookup: + ids_tensor = np.in1d(self.id_lookup[:, 0], ids_tensor) + embeddings = self.embeddings[ids_tensor] + if self.transform_function: + transformable[self.embedding_name] = self.transform_function(embeddings) + else: + transformable[self.embedding_name] = embeddings + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) diff --git a/merlin/loader/ops/embeddings/torch_embedding_op.py b/merlin/loader/ops/embeddings/torch_embedding_op.py new file mode 100644 index 00000000..699e86d3 --- /dev/null +++ b/merlin/loader/ops/embeddings/torch_embedding_op.py @@ -0,0 +1,248 @@ +import numpy as np +import torch +from torch.nn import Embedding + +from merlin.core.protocols import Transformable +from merlin.dag import BaseOperator +from merlin.dag.selector import ColumnSelector +from merlin.schema import ColumnSchema, Schema, Tags + + +class TorchEmbeddingOperator(BaseOperator): + """Create an operator that will apply a torch embedding table to supplied indices. + This operator allows the user to supply an id lookup table if the indices supplied + via the id_lookup_table. + + Parameters + ---------- + embeddings : np.ndarray + numpy ndarray representing embedding values + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + id_lookup_table : np.array, optional + numpy array of values that represent embedding indices, by default None + """ + + def __init__( + self, + embeddings: np.ndarray, + lookup_key: str = "id", + embedding_name: str = "embeddings", + id_lookup_table=None, + ): + self.embeddings = ( + embeddings + if isinstance(embeddings, Embedding) + else Embedding.from_pretrained(torch.FloatTensor(embeddings)) + ) + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.id_lookup_table = id_lookup_table + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + indices = transformable[self.lookup_key] + if self.id_lookup_table: + indices = torch.Tensor(np.in1d(self.id_lookup_table, indices.cpu())) + embeddings = self.embeddings(indices.cpu()) + transformable[self.embedding_name] = embeddings.to(indices.device) + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.weight.numpy().dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) + + +class Numpy_TorchEmbeddingOperator(BaseOperator): + """Create an embedding table from supplied embeddings to add embedding entry + to records based on supplied indices. Support for indices lookup table is available. + Embedding table is stored in host memory. + + Parameters + ---------- + embeddings : np.ndarray + numpy ndarray representing embedding values + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + id_lookup_table : np.array, optional + numpy array of values that represent embedding indices, by default None + """ + + def __init__( + self, + embeddings: np.ndarray, + lookup_key: str = "id", + embedding_name: str = "embeddings", + id_lookup_table=None, + ): + self.embeddings = embeddings + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.id_lookup_table = id_lookup_table + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + indices = transformable[self.lookup_key] + if self.id_lookup_table: + indices = np.in1d(self.id_lookup_table, indices.cpu()) + embeddings = self.embeddings[np.in1d(self.embeddings[:, 0], indices.cpu())] + transformable[self.embedding_name] = torch.from_numpy(embeddings[:, 1:]).to(indices.device) + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) + + +class Numpy_Mmap_TorchEmbedding(BaseOperator): + """Operator loads numpy embedding table from file using memory map to be used to create + torch embedding representations. This allows for larger than host memory embedding + tables to be used for embedding lookups. The only limit to the size is what fits in + storage, preferred storage device is SSD for faster lookups. + + Parameters + ---------- + embedding_npz : numpy ndarray file + file holding numpy ndarray representing embedding table + ids_lookup_npz : numpy array file, optional + file holding numpy array of values that represent embedding indices, by default None + lookup_key : str, optional + the name of the column that will be used as indices, by default "id" + embedding_name : str, optional + name of new column of embeddings, added to output, by default "embeddings" + transform_function : _type_, optional + function that will transform embedding from numpy to torch, by default None + """ + + def __init__( + self, + embedding_npz, + ids_lookup_npz=None, + lookup_key="id", + embedding_name="embeddings", + transform_function=None, + ): + self.embeddings = np.load(embedding_npz, mmap_mode="r") + self.id_lookup = np.load(ids_lookup_npz) if ids_lookup_npz else None + self.lookup_key = lookup_key + self.embedding_name = embedding_name + self.transform_function = transform_function or torch.from_numpy + + def transform( + self, col_selector: ColumnSelector, transformable: Transformable + ) -> Transformable: + indices = transformable[self.lookup_key] + if self.id_lookup: + indices = np.in1d(self.id_lookup[:, 0], indices) + embeddings = self.embeddings[indices.cpu()] + transformable[self.embedding_name] = self.transform_function(embeddings).to(indices.device) + return transformable + + def compute_output_schema( + self, + input_schema: Schema, + col_selector: ColumnSelector, + prev_output_schema: Schema = None, + ) -> Schema: + """Creates the output schema for this operator. + + Parameters + ---------- + input_schema : Schema + schema coming from ancestor nodes + col_selector : ColumnSelector + subselection of columns to apply to this operator + prev_output_schema : Schema, optional + the output schema of the previously executed operators, by default None + + Returns + ------- + Schema + Schema representing the correct output for this operator. + """ + col_schemas = [] + for _, col_schema in input_schema.column_schemas.items(): + col_schemas.append(col_schema) + col_schemas.append( + ColumnSchema( + name=self.embedding_name, + tags=[Tags.CONTINUOUS], + dtype=self.embeddings.dtype, + is_list=True, + is_ragged=False, + ) + ) + + return Schema(col_schemas) diff --git a/merlin/loader/tensorflow.py b/merlin/loader/tensorflow.py index 569c02a9..6c0dbc0d 100644 --- a/merlin/loader/tensorflow.py +++ b/merlin/loader/tensorflow.py @@ -16,8 +16,6 @@ import contextlib import logging -import numpy as np - from merlin.loader.loader_base import LoaderBase from merlin.loader.tf_utils import configure_tensorflow @@ -112,6 +110,7 @@ def __init__( global_size=None, global_rank=None, drop_last=False, + transforms=None, ): LoaderBase.__init__( self, @@ -123,6 +122,7 @@ def __init__( global_size=global_size, global_rank=global_rank, drop_last=drop_last, + transforms=transforms, ) self._map_fns = [] @@ -179,17 +179,13 @@ def _tensor_split(self, tensor, idx, axis=0): """ return tf.split(tensor, idx, axis=axis) - def _pack(self, gdf): - if isinstance(gdf, np.ndarray): - return gdf - elif hasattr(gdf, "to_dlpack") and callable(getattr(gdf, "to_dlpack")): - return gdf.to_dlpack() - elif hasattr(gdf, "to_numpy") and callable(getattr(gdf, "to_numpy")): - gdf = gdf.to_numpy() - if isinstance(gdf[0], list): - gdf = np.stack(gdf) - return gdf - return gdf.toDlpack() + @property + def _LONG_DTYPE(self): + return tf.int64 + + @property + def _FLOAT32_DTYPE(self): + return tf.float32 def _unpack(self, gdf): if hasattr(gdf, "shape"): diff --git a/merlin/loader/torch.py b/merlin/loader/torch.py index 777d9c97..304e6f77 100644 --- a/merlin/loader/torch.py +++ b/merlin/loader/torch.py @@ -29,7 +29,7 @@ np.int64: torch.int64, np.float16: torch.float16, np.float32: torch.float32, - np.float64: torch.float64, + np.float64: torch.double, np.complex64: torch.complex64, np.complex128: torch.complex128, } @@ -77,6 +77,7 @@ def __init__( global_size=None, global_rank=None, drop_last=False, + transforms=None, ): LoaderBase.__init__( self, @@ -88,6 +89,7 @@ def __init__( global_size=global_size, global_rank=global_rank, drop_last=drop_last, + transforms=transforms, ) def __iter__(self): @@ -98,11 +100,6 @@ def _get_device_ctx(self, dev): return torch.device("cpu") return torch.cuda.device("cuda:{}".format(dev)) - def _pack(self, gdf): - if self.device == "cpu": - return gdf - return gdf.to_dlpack() - def _unpack(self, dlpack): if self.device == "cpu": values = dlpack.values diff --git a/merlin/loader/utils/embeddings.py b/merlin/loader/utils/embeddings.py new file mode 100644 index 00000000..5529f39d --- /dev/null +++ b/merlin/loader/utils/embeddings.py @@ -0,0 +1,27 @@ +import gc + +import numpy as np +from npy_append_array import NpyAppendArray + +from merlin.core import dispatch + + +def build_embeddings_from_pq( + df_paths, embedding_filename="embeddings.npy", lookup_filename="lookup_ids" +): + df_lib = dispatch.get_lib() + with NpyAppendArray(embedding_filename) as nf: + with NpyAppendArray(lookup_filename) as lf: + for path in df_paths: + rows = df_lib.read_parquet(path) + numpy_rows = rows.to_numpy() + indices = np.ascontiguousarray(numpy_rows[:, 0]) + vectors = np.ascontiguousarray(numpy_rows[:, 1:]) + lf.append(indices) + nf.append(vectors) + del rows + del numpy_rows + del indices + del vectors + gc.collect() + return embedding_filename, lookup_filename diff --git a/tests/conftest.py b/tests/conftest.py index f5e3bf5f..d566594d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import glob import random import dask @@ -44,8 +45,9 @@ def assert_eq(a, b, *args, **kwargs): import pytest -from merlin.core.dispatch import make_df +from merlin.core.dispatch import concat_columns, make_df from merlin.io import Dataset +from merlin.loader.utils.embeddings import build_embeddings_from_pq from merlin.schema import Tags @@ -159,3 +161,61 @@ def multihot_dataset(multihot_data): ds = Dataset(make_df(multihot_data)) ds.schema["Post"] = ds.schema["Post"].with_tags(Tags.TARGET) return ds + + +@pytest.fixture(scope="session") +def num_embedding_ids(): + return 1 + + +@pytest.fixture(scope="session") +def embeddings_part_size(): + return 1e5 + + +@pytest.fixture(scope="session") +def embedding_ids(num_embedding_ids, embeddings_part_size): + df = make_df({"id": np.arange(num_embedding_ids * embeddings_part_size).astype("int32")}) + return df + + +@pytest.fixture(scope="session") +def rev_embedding_ids(embedding_ids, tmpdir_factory): + df_rev = df[::-1] + df_rev.reset_index(inplace=True, drop=True) + return df_rev + + +@pytest.fixture(scope="session") +def embeddings_from_dataframe(embedding_ids, num_embedding_ids, tmpdir_factory): + embed_dir = tmpdir_factory.mktemp("embeds") + for idx, splt in enumerate(np.array_split(embedding_ids.to_numpy(), num_embedding_ids)): + vals = make_df(np.random.rand(splt.shape[0], 1024)) + ids = make_df({"id": np.squeeze(splt)}) + full = concat_columns([ids, vals]) + full.columns = [str(col) for col in full.columns] + full.to_parquet(f"{embed_dir}/{idx}.parquet") + return embed_dir + + +@pytest.fixture(scope="session") +def rev_embeddings_from_dataframe(rev_embedding_ids, num_embedding_ids, tmpdir_factory): + embed_dir = tmpdir_factory.mktemp("rev_embeds") + for idx, splt in enumerate(np.array_split(rev_embedding_ids.to_numpy(), num_embedding_ids)): + vals = make_df(np.random.rand(splt.shape[0], 1024)) + ids = make_df({"id": np.squeeze(splt)}) + full = concat_columns([ids, vals]) + full.to_parquet(f"{embed_dir}/{idx}.parquet") + return embed_dir + + +@pytest.fixture(scope="session") +def np_embeddings_from_pq(embeddings_from_dataframe, tmpdir_factory): + paths = sorted(glob.glob(f"{embeddings_from_dataframe}/*")) + embed_dir = tmpdir_factory.mktemp("np_embeds") + embeddings_file = f"{embed_dir}/embeddings.npy" + lookup_ids_file = f"{embed_dir}/ids_lookup.npy" + npy_filename, lookup_filename = build_embeddings_from_pq( + paths, embeddings_file, lookup_ids_file + ) + return npy_filename, lookup_filename diff --git a/tests/unit/loader/test_tf_embeddings.py b/tests/unit/loader/test_tf_embeddings.py new file mode 100644 index 00000000..e1092b15 --- /dev/null +++ b/tests/unit/loader/test_tf_embeddings.py @@ -0,0 +1,132 @@ +# +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import glob + +import pytest +import rmm + +from merlin.io import Dataset +from merlin.loader.tensorflow import Loader +from merlin.schema import Tags + +tf = pytest.importorskip("tensorflow") + +from merlin.loader.ops.embeddings import ( # noqa + Numpy_Mmap_TFEmbedding, + Numpy_TFEmbeddingOperator, + TFEmbeddingOperator, +) + + +def test_embedding_tf_np_mmap_dl(tmpdir, embedding_ids, np_embeddings_from_pq): + embeddings_file, _ = np_embeddings_from_pq + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[Numpy_Mmap_TFEmbedding("/raid/data/embeds/embeddings.npy")], + shuffle=False, + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert batch[0]["embeddings"][0].shape[1] == 1024 + assert "id" in batch[0] + full_len += batch[0]["id"].shape[0] + assert full_len == embedding_ids.shape[0] + + +def test_embedding_tf_np_dl(tmpdir, embedding_ids, embeddings_from_dataframe): + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + paths = sorted(glob.glob(f"{embeddings_from_dataframe}/*")) + embeddings_ds = Dataset(paths) + embeddings_np = embeddings_ds.to_ddf().compute().to_numpy()[:, 1:] + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[Numpy_TFEmbeddingOperator(embeddings_np)], + shuffle=False, + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert batch[0]["embeddings"].shape[-1] == 1024 + assert "id" in batch[0] + full_len += batch[0]["id"].shape[0] + assert full_len == embedding_ids.shape[0] + + +def test_embedding_tf_dl(tmpdir, embedding_ids, embeddings_from_dataframe): + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + paths = sorted(glob.glob(f"{embeddings_from_dataframe}/*")) + embeddings_ds = Dataset(paths) + np_tensor = embeddings_ds.to_ddf().compute().to_numpy() + tf_tensor = tf.convert_to_tensor(np_tensor[:, 1:]) + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[TFEmbeddingOperator(tf_tensor)], + shuffle=False, + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert batch[0]["embeddings"][0].shape[1] == 1024 + assert "id" in batch[0] + full_len += batch[0]["id"].shape[0] + assert full_len == embedding_ids.shape[0] diff --git a/tests/unit/loader/test_torch_embeddings.py b/tests/unit/loader/test_torch_embeddings.py new file mode 100644 index 00000000..e6826628 --- /dev/null +++ b/tests/unit/loader/test_torch_embeddings.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import glob + +import pytest +import rmm + +from merlin.io import Dataset +from merlin.loader.torch import Loader +from merlin.schema import Tags + +torch = pytest.importorskip("torch") + +from merlin.loader.ops.embeddings import ( # noqa + Numpy_Mmap_TorchEmbedding, + Numpy_TorchEmbeddingOperator, + TorchEmbeddingOperator, +) + + +def test_embedding_torch_np_mmap_dl(tmpdir, embedding_ids, np_embeddings_from_pq): + embeddings_file, _ = np_embeddings_from_pq + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[Numpy_Mmap_TorchEmbedding(embeddings_file)], + shuffle=False, + # device="cpu", + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert "id" in batch[0] + full_len += batch[0]["embeddings"].shape[0] + assert full_len == embedding_ids.shape[0] + + +def test_embedding_torch_np_dl(tmpdir, embedding_ids, embeddings_from_dataframe): + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + paths = sorted(glob.glob(f"{embeddings_from_dataframe}/*")) + embeddings_ds = Dataset(paths) + embeddings_df = embeddings_ds.to_ddf().compute().to_numpy() + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[Numpy_TorchEmbeddingOperator(embeddings_df)], + shuffle=False, + # device="cpu", + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert "id" in batch[0] + full_len += batch[0]["embeddings"].shape[0] + assert full_len == embedding_ids.shape[0] + + +def test_embedding_torch_dl(tmpdir, embedding_ids, embeddings_from_dataframe): + rmm.reinitialize(managed_memory=True) + cat_names = ["id"] + + pq_path = tmpdir / "id.parquet" + embedding_ids.to_parquet(pq_path, row_group_size_bytes=134217728) + dataset = Dataset(str(pq_path)) + dataset = dataset.repartition(10) + schema = dataset.schema + for col_name in cat_names: + schema[col_name] = schema[col_name].with_tags(Tags.CATEGORICAL) + dataset.schema = schema + paths = sorted(glob.glob(f"{embeddings_from_dataframe}/*")) + embeddings_ds = Dataset(paths) + np_tensor = embeddings_ds.to_ddf().compute().to_numpy().astype("float32") + torch_tensor = torch.from_numpy(np_tensor) + data_loader = Loader( + dataset, + batch_size=100000, + transforms=[TorchEmbeddingOperator(torch_tensor)], + shuffle=False, + ) + full_len = 0 + for batch in data_loader: + assert "embeddings" in batch[0] + assert "id" in batch[0] + full_len += batch[0]["embeddings"].shape[0] + assert full_len == embedding_ids.shape[0]