diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index 78ee2a78eb..e17341c40d 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -25,13 +25,62 @@ Morpheus makes use of the MRC graph-execution framework. Morpheus pipelines are ## The Pass Through Stage -To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed. +To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed. Morpheus provides two ways of defining a stage, as a stand-alone function or as a class. -Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class. +### Stand-alone Function + +The stand-alone function approach is the simplest way to define a stage. The function should accept a single argument, which will be the input message, and return a single value, which will be the output message. The function should be decorated with the `morpheus.pipeline.stage_decorator.stage` decorator. + +```python +import typing + +from morpheus.pipeline.stage_decorator import stage + + +@stage +def pass_thru_stage(message: typing.Any) -> typing.Any: + # Return the message for the next stage + return message +``` + +When using the `stage` decorator it is required to use type annotations for the function parameters and return type, as this will be used by the stage as the accept and output types. In the above example the stage decorator will use the name of the function as the name of the stage. If we wanted to specify a different name for the stage we could do so by passing the name to the decorator as follows: + +```python +@stage(name="pass-thru") +def pass_thru_stage(message: typing.Any) -> typing.Any: + # Return the message for the next stage + return message +``` + +We can then add our stage to a pipeline as follows: +```python +config = Config() +pipeline = LinearPipeline(config) +# ... rest of pipeline setup +pipeline.add_stage(pass_thru_stage(config)) +``` + +It is possible to provide additional keyword arguments to the function. Consider the following example: +```python +@stage +def multiplier(message: MessageMeta, *, column: str, value: int | float = 2.0) -> MessageMeta: + with message.mutable_dataframe() as df: + df[column] = df[column] * value + + return message + +# ... + +pipe.add_stage(multiplier(config, column='probs', value=5)) +``` + +### Stage Class + +The class based aproach to defining a stage offers a bit more flexibility, specifically the ability to validate constructor arguments, and perform any needed setup prior to being invoked in a pipeline. Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class. Stages in Morpheus define what types of data they accept, and the type of data that they emit. In this example we are emitting messages of the same type that is received, this is actually quite common and Morpheus provides a mixin class, `PassThruTypeMixin`, to simplify this. -Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line. +Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line. We start our class definition with a few basic imports: @@ -155,17 +204,19 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage): ``` ## Testing our new Stage -To start testing our new pass through stage, we are going to construct a simple pipeline and add our new stage to it. This pipeline will do the minimum work necessary to verify our pass through stage. Data will flow through our simple pipeline as follows: +To start testing both our new function-based and class-based stages, we are going to construct a simple pipeline and add both stages to it. This pipeline will do the minimum work necessary to verify our pass through stages. Data will flow through our simple pipeline as follows: 1. A source stage will produce data and emit it into the pipeline. 1. This data will be read and processed by our pass through stage, in this case simply forwarding on the data. 1. A monitoring stage will record the messages from our pass through stage and terminate the pipeline. -First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the `PassThruStage`` in a file named "pass_thru.py" in the same directory. +First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the class based `PassThruStage` in a file named "pass_thru.py" and the function based `pass_thru_stage` named "pass_thru_deco.py" in the same directory. + ```python import logging import os from pass_thru import PassThruStage +from pass_thru_deco import pass_thru_stage from morpheus.config import Config from morpheus.pipeline import LinearPipeline @@ -174,6 +225,7 @@ from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.utils.logger import configure_logging ``` + Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger: ```python configure_logging(log_level=logging.DEBUG) @@ -189,10 +241,19 @@ In this example, we will use the `FileSourceStage` class to read a large file in pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) ``` -Next, we will add our new stage to the pipeline as well as a `MonitorStage` which will measure the throughput of our pass through stage: +Next, we will add both versions of our new stage to the pipeline as well as a `MonitorStage` instance for each to measure the throughput of our stages: ```python +# Add the decorated function stage +pipeline.add_stage(pass_thru_stage(config)) + +# Add monitor to record the performance of the function based stage +pipeline.add_stage(MonitorStage(config)) + +# Add the class based stage pipeline.add_stage(PassThruStage(config)) + +# Add monitor to record the performance of the class based stage pipeline.add_stage(MonitorStage(config)) ``` @@ -203,20 +264,30 @@ pipeline.run() The output should display: ``` -====Registering Pipeline==== -====Registering Pipeline Complete!==== -====Starting Pipeline==== -====Building Pipeline==== -Added source: +====Pipeline Pre-build==== +====Pre-Building Segment: linear_segment_0==== +====Pre-Building Segment Complete!==== +====Pipeline Pre-build Complete!==== +====Registering Pipeline==== +====Building Pipeline==== +====Building Pipeline Complete!==== +====Registering Pipeline Complete!==== +====Starting Pipeline==== +====Pipeline Started==== +====Building Segment: linear_segment_0==== +Added source: └─> morpheus.MessageMeta -Added stage: +Added stage: , on_data_args=(), accept_type=None, return_type=None, needed_columns=None, on_data_kwargs={})> + └─ morpheus.MessageMeta -> morpheus.MessageMeta +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta -====Building Pipeline Complete!==== -Starting! Time: 1648834587.3092508 -====Pipeline Started==== -Progress[Complete]: 25229messages [00:00, 57695.02messages/s] +Added stage: + └─ morpheus.MessageMeta -> morpheus.MessageMeta +====Building Segment Complete!==== +Progress[Complete]: 100 messages [00:01, 69.97 messages/s] +Progress[Complete]: 100 messages [00:01, 69.76 messages/s] ====Pipeline Complete==== ``` @@ -226,14 +297,15 @@ Note that this code assumes the `MORPHEUS_ROOT` environment variable is set to t import logging import os +from pass_thru import PassThruStage +from pass_thru_deco import pass_thru_stage + from morpheus.config import Config from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.utils.logger import configure_logging -from pass_thru import PassThruStage - def run_pipeline(): # Enable the Morpheus logger @@ -250,22 +322,31 @@ def run_pipeline(): # Set source stage pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) - # Add our own stage + # Add the decorated function stage + pipeline.add_stage(pass_thru_stage(config)) + + # Add monitor to record the performance of the function based stage + pipeline.add_stage(MonitorStage(config)) + + # Add the class based stage pipeline.add_stage(PassThruStage(config)) - # Add monitor to record the performance of our new stage + # Add monitor to record the performance of the class based stage pipeline.add_stage(MonitorStage(config)) # Run the pipeline pipeline.run() + if __name__ == "__main__": run_pipeline() ``` ### Alternate Morpheus CLI example -The above example makes use of the Morpheus Python API. Alternately, we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool. +The above example makes use of the Morpheus Python API. Alternately, we could test the class-based stage in a pipeline constructed using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool. + +> **Note**: For now, registering a stage with the CLI tool is currently only available to class based stages. From the root of the Morpheus repo, run: ```bash diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index f9e07cedf9..ba16f0b796 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -56,25 +56,6 @@ def on_data(self, message: MessageMeta) -> MessageMeta: return message ``` -In the above example we added five new fields to the DataFrame. Since these fields and their types are known to us ahead of time, as an optimization we can ask Morpheus to pre-allocate these new fields when the DataFrame is first constructed. To do this we populate the `_needed_columns` attribute in our constructor: -```python -def __init__(self, config: Config): - super().__init__(config) - - # This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed, - # ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any - # stage defining this attribute. - self._needed_columns.update({ - 'to_count': TypeId.INT32, - 'bcc_count': TypeId.INT32, - 'cc_count': TypeId.INT32, - 'total_recipients': TypeId.INT32, - 'data': TypeId.STRING - }) -``` - -Refer to the [Stage Constructors](#stage-constructors) section for more details. - If instead mutating the DataFrame in place is undesirable, we could make a copy of the DataFrame with the `MessageMeta.copy_dataframe` method and return a new `MessageMeta`. Note, however, that this would come at the cost of performance and increased memory usage. We could do this by changing the `on_data` method to: ```python def on_data(self, message: MessageMeta) -> MessageMeta: @@ -95,6 +76,29 @@ def on_data(self, message: MessageMeta) -> MessageMeta: return MessageMeta(df) ``` +In the above example we added five new fields to the DataFrame. Since these fields and their types are known to us ahead of time, as an optimization we can ask Morpheus to pre-allocate these new fields when the DataFrame is first constructed. To do this we populate the `_needed_columns` attribute in our constructor: +```python +from morpheus.common import TypeId + +# ... + +def __init__(self, config: Config): + super().__init__(config) + + # This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed, + # ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any + # stage defining this attribute. + self._needed_columns.update({ + 'to_count': TypeId.INT32, + 'bcc_count': TypeId.INT32, + 'cc_count': TypeId.INT32, + 'total_recipients': TypeId.INT32, + 'data': TypeId.STRING + }) +``` + +Refer to the [Stage Constructors](#stage-constructors) section for more details. + Since the purpose of this stage is specifically tied to pre-processing text data for an NLP pipeline, when we register the stage, we will explicitly limit the stage to NLP pipelines: ```python @register_stage("recipient-features", modes=[PipelineModes.NLP]) @@ -176,6 +180,45 @@ class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage): return node ``` +### Stand-alone Function + +For this example we started with the class based aproach. However we could have just as easily written this as a stand-alone function. The following example is equivalent to the class based example above: + +```python +from morpheus.common import TypeId +from morpheus.messages import MessageMeta +from morpheus.pipeline.stage_decorator import stage + + +@stage( + needed_columns={ + 'to_count': TypeId.INT32, + 'bcc_count': TypeId.INT32, + 'cc_count': TypeId.INT32, + 'total_recipients': TypeId.INT32, + 'data': TypeId.STRING + }) +def recipient_features_stage(message: MessageMeta, *, sep_token: str = '[SEP]') -> MessageMeta: + # Open the DataFrame from the incoming message for in-place modification + with message.mutable_dataframe() as df: + df['to_count'] = df['To'].str.count('@') + df['bcc_count'] = df['BCC'].str.count('@') + df['cc_count'] = df['CC'].str.count('@') + df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] + + # Attach features to string data + df['data'] = (df['to_count'].astype(str) + sep_token + df['bcc_count'].astype(str) + sep_token + + df['cc_count'].astype(str) + sep_token + df['total_recipients'].astype(str) + sep_token + + df['Message']) + + # Return the message for the next stage + return message +``` + +In the above the `needed_columns` were provided to as an argument to the `stage` decorator, and the optional `sep_token` argument is exposed as a keyword argument. + +> **Note**: One draw-back to the `stage` decorator approach is that we lose the ability to determine the `needed_columns` at runtime based upon constructor arguments. + ## Predicting Fraudulent Emails with Accelerated Machine Learning Now we'll use the `RecipientFeaturesStage` that we just made in a real-world pipeline to detect fraudulent emails. The pipeline we will be building makes use of the `TritonInferenceStage` which is a pre-defined Morpheus stage designed to support the execution of Natural Language Processing (NLP) models via NVIDIA's [Triton Inference Server](https://developer.nvidia.com/nvidia-triton-inference-server). NVIDIA Triton Inference Server allows for GPU accelerated ML/DL and seamless co-location and execution of a wide variety of model frameworks. For our application, we will be using the `phishing-bert-onnx` model, which is included with Morpheus in the `models/triton-model-repo/` directory. @@ -336,29 +379,9 @@ Output: From this information, we note that the expected dimensions of the model inputs is `"dims": [128]`. ### Defining our Pipeline -Let's set up the paths for our input and output files. For simplicity, we assume that the `MORPHEUS_ROOT` environment variable is set to the root of the Morpheus project repository. In a production deployment, it may be more prudent to replace our usage of environment variables with command-line flags or a dedicated configuration management library. +For this pipeline we will have several configuration parameters such as the paths to the input and output files, we will be using the (click)[https://click.palletsprojects.com/] library to expose and parse these parameters as command line arguments. We will also expose the choice of using the class or function based stage implementation via the `--use_stage_function` command-line flag. -```python -import os - -import morpheus -from morpheus.utils.logger import configure_logging - - -def run_pipeline(): - # Enable the default logger - configure_logging(log_level=logging.INFO) - - triton_url = os.environ.get('TRITON_URL', 'localhost:8001') - root_dir = os.environ['MORPHEUS_ROOT'] - out_dir = os.environ.get('OUT_DIR', '/tmp') - - labels_file = os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt') - vocab_file = os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt') - - input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines') - results_file = os.path.join(out_dir, 'detections.jsonlines') -``` +> **Note**: For simplicity, we assume that the `MORPHEUS_ROOT` environment variable is set to the root of the Morpheus project repository. To start, we will need to instantiate and set a few attributes of the `Config` class. This object is used for configuration options that are global to the pipeline as a whole. We will provide this object to each stage along with stage-specific configuration parameters. @@ -367,21 +390,31 @@ config = Config() config.mode = PipelineModes.NLP config.num_threads = os.cpu_count() -config.feature_length = 128 +config.feature_length = model_fea_length with open(labels_file, encoding='UTF-8') as fh: config.class_labels = [x.strip() for x in fh] ``` -First we set our pipeline mode to NLP. Next, we set the `num_threads` property to match the number of cores in our system. +First we set our pipeline mode to `NLP`. Next, we set the `num_threads` property to match the number of cores in our system. The `feature_length` property needs to match the dimensions of the model inputs, which we got from Triton in the previous section using the model's `/config` endpoint. Ground truth classification labels are read from the `morpheus/data/labels_phishing.txt` file included in Morpheus. -Now that our config object is populated, we move on to the pipeline itself. We will be using the same input file from the previous example, and to tokenize the input data we will use Morpheus' `PreprocessNLPStage`. +Now that our config object is populated, we move on to the pipeline itself. We will be using the same input file from the previous example. + +Next, we will add our custom recipient features stage to the pipeline. We imported both implementations of the stage, allowing us to add the appropriate one based on the `use_stage_function` value provided by the command-line. -This stage uses the [cudf subword tokenizer](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.subword_tokenizer.SubwordTokenizer.__call__.html) to transform strings into a tensor of numbers to be fed into the neural network model. Rather than split the string by characters or whitespaces, we split them into meaningful subwords based upon the occurrence of the subwords in a large training corpus. You can find more details here: [https://arxiv.org/abs/1810.04805v2](https://arxiv.org/abs/1810.04805v2). All we need to know for now is that the text will be converted to subword token ids based on the vocabulary file that we provide (`vocab_hash_file=vocab file`). +```python +# Add our custom stage +if use_stage_function: + pipeline.add_stage(recipient_features_stage(config)) +else: + pipeline.add_stage(RecipientFeaturesStage(config)) +``` + +To tokenize the input data we will use Morpheus' `PreprocessNLPStage`. This stage uses the [cudf subword tokenizer](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.subword_tokenizer.SubwordTokenizer.__call__.html) to transform strings into a tensor of numbers to be fed into the neural network model. Rather than split the string by characters or whitespaces, we split them into meaningful subwords based upon the occurrence of the subwords in a large training corpus. You can find more details here: [https://arxiv.org/abs/1810.04805v2](https://arxiv.org/abs/1810.04805v2). All we need to know for now is that the text will be converted to subword token ids based on the vocabulary file that we provide (`vocab_hash_file=vocab file`). Let's go ahead and instantiate our `PreprocessNLPStage` and add it to the pipeline: @@ -408,12 +441,12 @@ At this point, we have a pipeline that reads in a set of records and pre-process Next we will add a monitor stage to measure the inference rate: ```python -# Add an inference stage +# Add a inference stage pipeline.add_stage( TritonInferenceStage( config, - model_name='phishing-bert-onnx', - server_url=triton_url, + model_name=model_name, + server_url=server_url, force_convert_inputs=True, )) @@ -435,10 +468,10 @@ The `WriteToFileStage` will append message data to the output file as messages a ```python # Write the file to the output pipeline.add_stage(SerializeStage(config)) -pipeline.add_stage(WriteToFileStage(config, filename=results_file, overwrite=True)) +pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) ``` -Note that we didn't specify the output format. In our example, the result file contains the extension `.jsonlines`. Morpheus will infer the output format based on the extension. At time of writing the extensions that Morpheus will infer are: `.csv`, `.json` & `.jsonlines`. +Note that we didn't specify the output format. In our example, the default output file name contains the extension `.jsonlines`. Morpheus will infer the output format based on the extension. At time of writing the extensions that Morpheus will infer are: `.csv`, `.json` & `.jsonlines`. To explicitly set the output format we could specify the `file_type` argument to the `WriteToFileStage` which is an enumeration defined in `morpheus.common.FileTypes`. Supported values are: * `FileTypes.Auto` @@ -450,8 +483,9 @@ To explicitly set the output format we could specify the `file_type` argument to ```python import logging import os +import tempfile -from recipient_features_stage import RecipientFeaturesStage +import click import morpheus from morpheus.common import FilterSource @@ -467,30 +501,71 @@ from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage from morpheus.utils.logger import configure_logging - - -def run_pipeline(): +from recipient_features_stage import RecipientFeaturesStage +from recipient_features_stage_deco import recipient_features_stage + +MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] + + +@click.command() +@click.option("--use_stage_function", + is_flag=True, + default=False, + help="Use the function based version of the recipient features stage instead of the class") +@click.option( + "--labels_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt'), + help="Specifies a file to read labels from in order to convert class IDs into labels.", +) +@click.option( + "--vocab_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt'), + help="Path to hash file containing vocabulary of words with token-ids.", +) +@click.option( + "--input_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(MORPHEUS_ROOT, 'examples/data/email_with_addresses.jsonlines'), + help="Input filepath.", +) +@click.option( + "--model_fea_length", + default=128, + type=click.IntRange(min=1), + help="Features length to use for the model.", +) +@click.option( + "--model_name", + default="phishing-bert-onnx", + help="The name of the model that is deployed on Tritonserver.", +) +@click.option("--server_url", default='localhost:8001', help="Tritonserver url.") +@click.option( + "--output_file", + default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + help="The path to the file where the inference output will be saved.", +) +def run_pipeline(use_stage_function: bool, + labels_file: str, + vocab_file: str, + input_file: str, + model_fea_length: int, + model_name: str, + server_url: str, + output_file: str): """Run the phishing detection pipeline.""" # Enable the default logger configure_logging(log_level=logging.INFO) - triton_url = os.environ.get('TRITON_URL', 'localhost:8001') - root_dir = os.environ['MORPHEUS_ROOT'] - out_dir = os.environ.get('OUT_DIR', '/tmp') - - labels_file = os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt') - vocab_file = os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt') - - input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines') - results_file = os.path.join(out_dir, 'detections.jsonlines') - # It's necessary to configure the pipeline for NLP mode config = Config() config.mode = PipelineModes.NLP # Set the thread count to match our cpu count config.num_threads = os.cpu_count() - config.feature_length = 128 + config.feature_length = model_fea_length with open(labels_file, encoding='UTF-8') as fh: config.class_labels = [x.strip() for x in fh] @@ -502,7 +577,10 @@ def run_pipeline(): pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) # Add our custom stage - pipeline.add_stage(RecipientFeaturesStage(config)) + if use_stage_function: + pipeline.add_stage(recipient_features_stage(config)) + else: + pipeline.add_stage(RecipientFeaturesStage(config)) # Add a deserialize stage pipeline.add_stage(DeserializeStage(config)) @@ -519,8 +597,8 @@ def run_pipeline(): pipeline.add_stage( TritonInferenceStage( config, - model_name='phishing-bert-onnx', - server_url=triton_url, + model_name=model_name, + server_url=server_url, force_convert_inputs=True, )) @@ -532,7 +610,7 @@ def run_pipeline(): # Write the to the output file pipeline.add_stage(SerializeStage(config)) - pipeline.add_stage(WriteToFileStage(config, filename=results_file, overwrite=True)) + pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) # Run the pipeline pipeline.run() @@ -663,6 +741,8 @@ Options: > **Note**: The code for this guide can be found in the `examples/developer_guide/2_2_rabbitmq` directory of the Morpheus repository. +### Class Based Approach + Creating a new source stage is similar to defining any other stage with a few differences. First, we will be subclassing `SingleOutputSource` including the `PreallocatorMixin`. Second, the required methods are the `name` property, `_build_source`, `compute_schema` and `supports_cpp_node` methods. In this example, we will create a source that reads messages from a [RabbitMQ](https://www.rabbitmq.com/) queue using the [pika](https://pika.readthedocs.io/en/stable/#) client for Python. For simplicity, we will assume that authentication is not required for our RabbitMQ exchange and that the body of the RabbitMQ messages will be JSON formatted. Both authentication and support for other formats could be easily added later. @@ -685,7 +765,7 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: The `source_generator` method is where most of the RabbitMQ-specific code exists. When we have a message that we wish to emit into the pipeline, we simply `yield` it. ```python -def source_generator(self): +def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: while not self._stop_requested: (method_frame, header_frame, body) = self._channel.basic_get(self._queue_name) @@ -708,9 +788,10 @@ def source_generator(self): Note that we read messages as quickly as we can from the queue. When the queue is empty we call `time.sleep`, allowing for a context switch to occur if needed. We acknowledge the message (by calling `basic_ack`) only once we have successfully emitted the message or failed to deserialize the message. This means that if the pipeline shuts down while consuming the queue, we will not lose any messages. However, in that situation we may end up with a duplicate message (i.e., if the pipeline is shut down after we have yielded the message but before calling `basic_ack`). -### The Completed Source Stage +#### The Completed Source Stage ```python +import collections.abc import logging import time from io import StringIO @@ -796,7 +877,7 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) - def source_generator(self): + def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: while not self._stop_requested: (method_frame, _, body) = self._channel.basic_get(self._queue_name) @@ -817,6 +898,84 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): self._connection.close() ``` +### Function Based Approach +Similar to the `stage` decorator used in previous examples Morpheus provides a `source` decorator which wraps a generator function to be used as a source stage. In the class based approach we explicitly added the `PreallocatorMixin`, when using the `source` decorator the return type annotation will be inspected and a stage will be created with the `PreallocatorMixin` if the return type is a `DataFrame` type or a message which contains a `DataFrame` (`MessageMeta` and `MultiMessage`). + +The code for the function will first perform the same setup as was used in the class constructor, then entering a nearly identical loop as that in the `source_generator` method. + +```python +import collections.abc +import logging +import time +from io import StringIO + +import pandas as pd +import pika + +import cudf + +from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline.stage_decorator import source + +logger = logging.getLogger(__name__) + + +@source(name="from-rabbitmq") +def rabbitmq_source(host: str, + exchange: str, + exchange_type: str = 'fanout', + queue_name: str = '', + poll_interval: str = '100millis') -> collections.abc.Iterator[MessageMeta]: + """ + Source stage used to load messages from a RabbitMQ queue. + + Parameters + ---------- + host : str + Hostname or IP of the RabbitMQ server. + exchange : str + Name of the RabbitMQ exchange to connect to. + exchange_type : str, optional + RabbitMQ exchange type; defaults to `fanout`. + queue_name : str, optional + Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange. + poll_interval : str, optional + Amount of time between polling RabbitMQ for new messages + """ + connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) + + channel = connection.channel() + channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) + + result = channel.queue_declare(queue=queue_name, exclusive=True) + + # When queue_name='' we will receive a randomly generated queue name + queue_name = result.method.queue + + channel.queue_bind(exchange=exchange, queue=queue_name) + + poll_interval = pd.Timedelta(poll_interval) + + try: + while True: + (method_frame, _, body) = channel.basic_get(queue_name) + if method_frame is not None: + try: + buffer = StringIO(body.decode("utf-8")) + df = cudf.io.read_json(buffer, orient='records', lines=True) + yield MessageMeta(df=df) + except Exception as ex: + logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) + finally: + channel.basic_ack(method_frame.delivery_tag) + else: + # queue is empty, sleep before polling again + time.sleep(poll_interval.total_seconds()) + + finally: + connection.close() +``` + ## Defining a New Sink Stage In Morpheus, we define a stage to be a sink if it outputs the results of a pipeline to a destination external to the pipeline. Morpheus includes several sink stages under the `morpheus.stages.output` namespace. @@ -948,4 +1107,4 @@ class WriteToRabbitMQStage(PassThruTypeMixin, SinglePortStage): self._connection.close() ``` -> **Note**: For information about testing the `RabbitMQSourceStage` and `WriteToRabbitMQStage` stages refer to `examples/developer_guide/2_2_rabbitmq/README.md` in the the Morpheus repo. +> **Note**: For information about testing the `RabbitMQSourceStage`, `rabbitmq_source`, and `WriteToRabbitMQStage` stages refer to `examples/developer_guide/2_2_rabbitmq/README.md` in the the Morpheus repo. diff --git a/docs/source/developer_guide/guides/3_simple_cpp_stage.md b/docs/source/developer_guide/guides/3_simple_cpp_stage.md index daf9a89d83..da2ef93624 100644 --- a/docs/source/developer_guide/guides/3_simple_cpp_stage.md +++ b/docs/source/developer_guide/guides/3_simple_cpp_stage.md @@ -22,7 +22,7 @@ limitations under the License. Morpheus offers the choice of writing pipeline stages in either Python or C++. For many use cases, a Python stage is perfectly fine. However, in the event that a Python stage becomes a bottleneck for the pipeline, then writing a C++ implementation for the stage becomes advantageous. The C++ implementations of Morpheus stages and messages utilize the [pybind11](https://pybind11.readthedocs.io/en/stable/index.html) library to provide Python bindings. -So far we have been defining our pipelines in Python. Most of the stages included with Morpheus have both a Python and a C++ implementation, and Morpheus will use the C++ implementations by default. You can explicitly disable the use of C++ stage implementations by calling `morpheus.config.CppConfig.set_should_use_cpp(False)`: +So far we have been defining our stages in Python, the option of defining a C++ implementation is only available to stages implemented as classes. Many of the stages included with Morpheus have both a Python and a C++ implementation, and Morpheus will use the C++ implementations by default. You can explicitly disable the use of C++ stage implementations by calling `morpheus.config.CppConfig.set_should_use_cpp(False)`: ```python from morpheus.config import CppConfig diff --git a/examples/developer_guide/1_simple_python_stage/pass_thru.py b/examples/developer_guide/1_simple_python_stage/pass_thru.py index 5efa22e246..0100284080 100644 --- a/examples/developer_guide/1_simple_python_stage/pass_thru.py +++ b/examples/developer_guide/1_simple_python_stage/pass_thru.py @@ -39,7 +39,7 @@ def accepted_types(self) -> tuple: def supports_cpp_node(self) -> bool: return False - def on_data(self, message: typing.Any): + def on_data(self, message: typing.Any) -> typing.Any: # Return the message for the next stage return message diff --git a/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py b/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py new file mode 100644 index 0000000000..e10a3c75f6 --- /dev/null +++ b/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +from morpheus.pipeline.stage_decorator import stage + + +@stage +def pass_thru_stage(message: typing.Any) -> typing.Any: + # Return the message for the next stage + return message diff --git a/examples/developer_guide/1_simple_python_stage/run.py b/examples/developer_guide/1_simple_python_stage/run.py index 1ffcc556e1..73d9616104 100755 --- a/examples/developer_guide/1_simple_python_stage/run.py +++ b/examples/developer_guide/1_simple_python_stage/run.py @@ -18,6 +18,7 @@ import os from pass_thru import PassThruStage +from pass_thru_deco import pass_thru_stage from morpheus.config import Config from morpheus.pipeline import LinearPipeline @@ -41,10 +42,16 @@ def run_pipeline(): # Set source stage pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) - # Add our own stage + # Add the decorated function stage + pipeline.add_stage(pass_thru_stage(config)) + + # Add monitor to record the performance of the function based stage + pipeline.add_stage(MonitorStage(config)) + + # Add the class based stage pipeline.add_stage(PassThruStage(config)) - # Add monitor to record the performance of our new stage + # Add monitor to record the performance of the class based stage pipeline.add_stage(MonitorStage(config)) # Run the pipeline diff --git a/examples/developer_guide/2_1_real_world_phishing/recipient_features_stage_deco.py b/examples/developer_guide/2_1_real_world_phishing/recipient_features_stage_deco.py new file mode 100644 index 0000000000..ec0ab0a415 --- /dev/null +++ b/examples/developer_guide/2_1_real_world_phishing/recipient_features_stage_deco.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from morpheus.common import TypeId +from morpheus.messages import MessageMeta +from morpheus.pipeline.stage_decorator import stage + + +@stage( + needed_columns={ + 'to_count': TypeId.INT32, + 'bcc_count': TypeId.INT32, + 'cc_count': TypeId.INT32, + 'total_recipients': TypeId.INT32, + 'data': TypeId.STRING + }) +def recipient_features_stage(message: MessageMeta, *, sep_token: str = '[SEP]') -> MessageMeta: + # Open the DataFrame from the incoming message for in-place modification + with message.mutable_dataframe() as df: + df['to_count'] = df['To'].str.count('@') + df['bcc_count'] = df['BCC'].str.count('@') + df['cc_count'] = df['CC'].str.count('@') + df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] + + # Attach features to string data + df['data'] = (df['to_count'].astype(str) + sep_token + df['bcc_count'].astype(str) + sep_token + + df['cc_count'].astype(str) + sep_token + df['total_recipients'].astype(str) + sep_token + + df['Message']) + + # Return the message for the next stage + return message diff --git a/examples/developer_guide/2_1_real_world_phishing/run.py b/examples/developer_guide/2_1_real_world_phishing/run.py index f133b3d252..3251b9689c 100755 --- a/examples/developer_guide/2_1_real_world_phishing/run.py +++ b/examples/developer_guide/2_1_real_world_phishing/run.py @@ -17,8 +17,11 @@ import logging import os +import tempfile +import click from recipient_features_stage import RecipientFeaturesStage +from recipient_features_stage_deco import recipient_features_stage import morpheus from morpheus.common import FilterSource @@ -35,29 +38,68 @@ from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage from morpheus.utils.logger import configure_logging - -def run_pipeline(): +MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] + + +@click.command() +@click.option("--use_stage_function", + is_flag=True, + default=False, + help="Use the function based version of the recipient features stage instead of the class") +@click.option( + "--labels_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt'), + help="Specifies a file to read labels from in order to convert class IDs into labels.", +) +@click.option( + "--vocab_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt'), + help="Path to hash file containing vocabulary of words with token-ids.", +) +@click.option( + "--input_file", + type=click.Path(exists=True, readable=True), + default=os.path.join(MORPHEUS_ROOT, 'examples/data/email_with_addresses.jsonlines'), + help="Input filepath.", +) +@click.option( + "--model_fea_length", + default=128, + type=click.IntRange(min=1), + help="Features length to use for the model.", +) +@click.option( + "--model_name", + default="phishing-bert-onnx", + help="The name of the model that is deployed on Tritonserver.", +) +@click.option("--server_url", default='localhost:8001', help="Tritonserver url.") +@click.option( + "--output_file", + default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + help="The path to the file where the inference output will be saved.", +) +def run_pipeline(use_stage_function: bool, + labels_file: str, + vocab_file: str, + input_file: str, + model_fea_length: int, + model_name: str, + server_url: str, + output_file: str): """Run the phishing detection pipeline.""" # Enable the default logger configure_logging(log_level=logging.INFO) - triton_url = os.environ.get('TRITON_URL', 'localhost:8001') - root_dir = os.environ['MORPHEUS_ROOT'] - out_dir = os.environ.get('OUT_DIR', '/tmp') - - labels_file = os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt') - vocab_file = os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt') - - input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines') - results_file = os.path.join(out_dir, 'detections.jsonlines') - # It's necessary to configure the pipeline for NLP mode config = Config() config.mode = PipelineModes.NLP # Set the thread count to match our cpu count config.num_threads = os.cpu_count() - config.feature_length = 128 + config.feature_length = model_fea_length with open(labels_file, encoding='UTF-8') as fh: config.class_labels = [x.strip() for x in fh] @@ -69,7 +111,10 @@ def run_pipeline(): pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) # Add our custom stage - pipeline.add_stage(RecipientFeaturesStage(config)) + if use_stage_function: + pipeline.add_stage(recipient_features_stage(config)) + else: + pipeline.add_stage(RecipientFeaturesStage(config)) # Add a deserialize stage pipeline.add_stage(DeserializeStage(config)) @@ -86,8 +131,8 @@ def run_pipeline(): pipeline.add_stage( TritonInferenceStage( config, - model_name='phishing-bert-onnx', - server_url=triton_url, + model_name=model_name, + server_url=server_url, force_convert_inputs=True, )) @@ -99,7 +144,7 @@ def run_pipeline(): # Write the to the output file pipeline.add_stage(SerializeStage(config)) - pipeline.add_stage(WriteToFileStage(config, filename=results_file, overwrite=True)) + pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) # Run the pipeline pipeline.run() diff --git a/examples/developer_guide/2_2_rabbitmq/README.md b/examples/developer_guide/2_2_rabbitmq/README.md index 6c325a60c8..bff64752a0 100644 --- a/examples/developer_guide/2_2_rabbitmq/README.md +++ b/examples/developer_guide/2_2_rabbitmq/README.md @@ -40,7 +40,7 @@ python examples/developer_guide/2_2_rabbitmq/read_simple.py This will read from a RabbitMQ exchange named 'logs', and write the results to `/tmp/results.json`. -If no exchange named 'logs' exists in RabbitMQ it will be created. +If no exchange named 'logs' exists in RabbitMQ it will be created. By default the `read_simple.py` script will utilize the class-based `RabbitMQSourceStage`, alternately using the `--use_source_function` flag will utilize the function-based `rabbitmq_source` stage. ## Launch the writer In a third terminal from the root of the Morpheus repo execute: diff --git a/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py index 4208b0b3a0..1479626507 100644 --- a/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py +++ b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections.abc import logging import time from io import StringIO @@ -98,7 +99,7 @@ def stop(self): def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) - def source_generator(self): + def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: while not self._stop_requested: (method_frame, _, body) = self._channel.basic_get(self._queue_name) diff --git a/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage_deco.py b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage_deco.py new file mode 100644 index 0000000000..3f7dfab0d0 --- /dev/null +++ b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage_deco.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections.abc +import logging +import time +from io import StringIO + +import pandas as pd +import pika + +import cudf + +from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline.stage_decorator import source + +logger = logging.getLogger(__name__) + + +@source(name="from-rabbitmq") +def rabbitmq_source(host: str, + exchange: str, + exchange_type: str = 'fanout', + queue_name: str = '', + poll_interval: str = '100millis') -> collections.abc.Iterator[MessageMeta]: + """ + Source stage used to load messages from a RabbitMQ queue. + + Parameters + ---------- + host : str + Hostname or IP of the RabbitMQ server. + exchange : str + Name of the RabbitMQ exchange to connect to. + exchange_type : str, optional + RabbitMQ exchange type; defaults to `fanout`. + queue_name : str, optional + Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange. + poll_interval : str, optional + Amount of time between polling RabbitMQ for new messages + """ + connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) + + channel = connection.channel() + channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) + + result = channel.queue_declare(queue=queue_name, exclusive=True) + + # When queue_name='' we will receive a randomly generated queue name + queue_name = result.method.queue + + channel.queue_bind(exchange=exchange, queue=queue_name) + + poll_interval = pd.Timedelta(poll_interval) + + try: + while True: + (method_frame, _, body) = channel.basic_get(queue_name) + if method_frame is not None: + try: + buffer = StringIO(body.decode("utf-8")) + df = cudf.io.read_json(buffer, orient='records', lines=True) + yield MessageMeta(df=df) + except Exception as ex: + logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) + finally: + channel.basic_ack(method_frame.delivery_tag) + else: + # queue is empty, sleep before polling again + time.sleep(poll_interval.total_seconds()) + + finally: + connection.close() diff --git a/examples/developer_guide/2_2_rabbitmq/read_simple.py b/examples/developer_guide/2_2_rabbitmq/read_simple.py index 969124c209..14705dbe1e 100755 --- a/examples/developer_guide/2_2_rabbitmq/read_simple.py +++ b/examples/developer_guide/2_2_rabbitmq/read_simple.py @@ -16,7 +16,9 @@ import logging import os +import click from rabbitmq_source_stage import RabbitMQSourceStage +from rabbitmq_source_stage_deco import rabbitmq_source from morpheus.common import FileTypes from morpheus.config import Config @@ -26,7 +28,12 @@ from morpheus.utils.logger import configure_logging -def run_pipeline(): +@click.command() +@click.option("--use_source_function", + is_flag=True, + default=False, + help="Use the function based version of the RabbitMQ source stage instead of the class") +def run_pipeline(use_source_function: bool): # Enable the Morpheus logger configure_logging(log_level=logging.DEBUG) @@ -37,7 +44,10 @@ def run_pipeline(): pipeline = LinearPipeline(config) # Set source stage - pipeline.set_source(RabbitMQSourceStage(config, host='localhost', exchange='logs')) + if use_source_function: + pipeline.set_source(rabbitmq_source(config, host='localhost', exchange='logs')) + else: + pipeline.set_source(RabbitMQSourceStage(config, host='localhost', exchange='logs')) # Add monitor to record the performance of our new stages pipeline.add_stage(MonitorStage(config)) diff --git a/morpheus/pipeline/stage_base.py b/morpheus/pipeline/stage_base.py index 1db05fd1db..b0bbfe7daa 100644 --- a/morpheus/pipeline/stage_base.py +++ b/morpheus/pipeline/stage_base.py @@ -480,10 +480,9 @@ def compute_schema(self, schema: _pipeline.StageSchema): Derived classes need to override this method, can set the output type(s) on `schema` by calling `set_type` for all output ports. For example a simple pass-thru stage might perform the following: - ``` >>> for (port_idx, port_schema) in enumerate(schema.input_schemas): - >>> schema.output_schemas[port_idx].set_type(port_schema.get_type()) - ``` + ... schema.output_schemas[port_idx].set_type(port_schema.get_type()) + >>> If the port types in `upstream_schema` are incompatible the stage should raise a `RuntimeError`. """ diff --git a/morpheus/pipeline/stage_decorator.py b/morpheus/pipeline/stage_decorator.py new file mode 100644 index 0000000000..76481fa867 --- /dev/null +++ b/morpheus/pipeline/stage_decorator.py @@ -0,0 +1,367 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import functools +import inspect +import logging +import typing + +import mrc +import pandas as pd +from mrc.core import operators as ops + +import cudf + +from morpheus.common import TypeId +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage +from morpheus.pipeline.preallocator_mixin import PreallocatorMixin +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.pipeline.stage_schema import StageSchema + +logger = logging.getLogger(__name__) +GeneratorType = typing.Callable[..., collections.abc.Iterator[typing.Any]] +ComputeSchemaType = typing.Callable[[StageSchema], None] + + +def _get_name_from_fn(fn: typing.Callable) -> str: + try: + return fn.__name__ + except AttributeError: + # If the function is a partial, it won't have a name + if isinstance(fn, functools.partial): + return _get_name_from_fn(fn.func) + + return str(fn) + + +def _validate_keyword_arguments(fn_name: str, + signature: inspect.Signature, + kwargs: dict[str, typing.Any], + param_iter: typing.Iterator): + # If we have any keyword arguments with a default value that we did not receive an explicit value for, we need + # to bind it, otherwise it will trigger an error when MRC. + for param in param_iter: + if param.default is not signature.empty and param.name not in kwargs: + kwargs[param.name] = param.default + + # if a parameter is keyword only, containing neither a a default value or an entry in on_data_kwargs, we + # need to raise an error + if param.kind in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY) and param.name not in kwargs: + raise ValueError(f"Wrapped function {fn_name} has keyword only parameter '{param.name}' that was not " + "provided a value") + + if param.kind is param.POSITIONAL_ONLY: + + raise ValueError("Positional arguments are not supported for wrapped functions. " + f"{fn_name} contains '{param.name}' that was not provided with a value") + + +class WrappedFunctionSourceStage(SingleOutputSource): + """ + Source stage that wraps a generator function as the method for generating messages. + + The wrapped function must be a generator function. If the output type of the generator is an instance of + `pandas.DataFrame`, `cudf.DataFrame`, `MessageMeta`, or `MultiMessage` the `PreAllocatedWrappedFunctionStage` class + should be used instead, as this will also perform any DataFrame column allocations needed by other stages in the + pipeline. + + Parameters + ---------- + config : `morpheus.config.Config` + Pipeline configuration instance. + name: `str` + Name of the stage. + gen_fn : `GeneratorType` + Generator function to use as the source of messages. + compute_schema_fn : `ComputeSchemaType` + Function to use for computing the schema of the stage. + """ + + def __init__(self, config: Config, *, name: str, gen_fn: GeneratorType, compute_schema_fn: ComputeSchemaType): + super().__init__(config) + # collections.abc.Generator is a subclass of collections.abc.Iterator + if not inspect.isgeneratorfunction(gen_fn): + raise ValueError("Wrapped source functions must be generator functions") + + self._name = name + self._gen_fn = gen_fn + self._compute_schema_fn = compute_schema_fn + + @property + def name(self) -> str: + return self._name + + def supports_cpp_node(self) -> bool: + return False + + def compute_schema(self, schema: StageSchema): + self._compute_schema_fn(schema) + + def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: + return builder.make_source(self.unique_name, self._gen_fn) + + +class PreAllocatedWrappedFunctionStage(PreallocatorMixin, WrappedFunctionSourceStage): + """ + Source stage that wraps a generator function as the method for generating messages. + + This stage provides the same functionality as `WrappedFunctionSourceStage`, but also performs any DataFrame column + allocations needed by other stages in the pipeline. As such the return type for `gen_fn` must be one of: + `pandas.DataFrame`, `cudf.DataFrame`, `MessageMeta`, or `MultiMessage`. + + Parameters + ---------- + config : `morpheus.config.Config` + Pipeline configuration instance. + name: `str` + Name of the stage. + gen_fn : `GeneratorType` + Generator function to use as the source of messages. + compute_schema_fn : `ComputeSchemaType` + Function to use for computing the schema of the stage. + """ + + +def source(gen_fn: GeneratorType = None, *, name: str = None, compute_schema_fn: ComputeSchemaType = None): + """ + Decorator for wrapping a function as a source stage. The function must be a generator method, and provide a + provide a return type annotation. + + When `compute_schema_fn` is `None`, the return type annotation will be used by the stage as the output type. + + When invoked the wrapped function will return a source stage, any additional keyword arguments passed in aside from + the config, will be bound to the wrapped function via `functools.partial`. + + Examples + -------- + + >>> @source + ... def source_gen(*, dataframes: list[cudf.DataFrame]) -> collections.abc.Iterator[MessageMeta]: + ... for df in dataframes: + ... yield MessageMeta(df) + ... + >>> + + >>> pipe.set_source(source_gen(config, dataframes=[df])) + """ + if gen_fn is None: + return functools.partial(source, name=name, compute_schema_fn=compute_schema_fn) + + # Use wraps to ensure user's don't lose their function name and docstrinsgs, however we do want to override the + # annotations to reflect that the returned function requires a config and returns a stage + @functools.wraps(gen_fn, assigned=('__module__', '__name__', '__qualname__', '__doc__')) + def wrapper(config: Config, **kwargs) -> WrappedFunctionSourceStage: + nonlocal name + nonlocal compute_schema_fn + + if name is None: + name = _get_name_from_fn(gen_fn) + + signature = inspect.signature(gen_fn) + return_type = signature.return_annotation + if return_type is signature.empty: + raise ValueError("Source functions must specify a return type annotation") + + # We need to unpack generator and iterator return types to get the actual type of the yielded type. + # When someone uses collections.abc.Generator or collections.abc.Iterator the return type is an instance of + # typing.GenericAlias, however when someone uses typing.Generator or typing.Iterator the return type is an + # instance of typing._GenericAlias. We need to check for both. + if isinstance(return_type, (typing.GenericAlias, typing._GenericAlias)): + return_type = return_type.__args__[0] + + if compute_schema_fn is None: # pylint: disable=used-before-assignment + + def compute_schema_fn(schema: StageSchema): + schema.output_schema.set_type(return_type) + + _validate_keyword_arguments(name, signature, kwargs, param_iter=iter(signature.parameters.values())) + + bound_gen_fn = functools.partial(gen_fn, **kwargs) + + # If the return type supports pre-allocation we use the pre-allocating source + if return_type in (pd.DataFrame, cudf.DataFrame, MessageMeta, MultiMessage): + + return PreAllocatedWrappedFunctionStage(config=config, + name=name, + gen_fn=bound_gen_fn, + compute_schema_fn=compute_schema_fn) + + return WrappedFunctionSourceStage(config=config, + name=name, + gen_fn=bound_gen_fn, + compute_schema_fn=compute_schema_fn) + + return wrapper + + +class WrappedFunctionStage(SinglePortStage): + """ + Stage that wraps a function to be used for processing messages. + + The function must receive at least one argument, the first argument must be the incoming message, and must + return a value. + + Parameters + ---------- + config : `morpheus.config.Config` + Pipeline configuration instance. + name: `str` + Name of the stage. + on_data_fn : `typing.Callable` + Function to be used for processing messages. + accept_type: type + Type of the input message. + compute_schema_fn : `ComputeSchemaType` + Function to use for computing the schema of the stage. + needed_columns : `dict[str, TypeId]`, optional + Dictionary of column names and types that the function requires to be present in the DataFrame. This is used + by the `PreAllocatedWrappedFunctionStage` to ensure the DataFrame has the needed columns allocated. + """ + + def __init__( + self, + config: Config, + *, + name: str = None, + on_data_fn: typing.Callable, + accept_type: type, + compute_schema_fn: ComputeSchemaType, + needed_columns: dict[str, TypeId] = None, + ): + super().__init__(config) + self._name = name + self._on_data_fn = on_data_fn + self._accept_type = accept_type + self._compute_schema_fn = compute_schema_fn + + if needed_columns is not None: + self._needed_columns.update(needed_columns) + + @property + def name(self) -> str: + return self._name + + def accepted_types(self) -> typing.Tuple: + return (self._accept_type, ) + + def supports_cpp_node(self) -> bool: + return False + + def compute_schema(self, schema: StageSchema): + self._compute_schema_fn(schema) + + def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: + node = builder.make_node(self.unique_name, ops.map(self._on_data_fn)) + builder.make_edge(input_node, node) + + return node + + +def stage(on_data_fn: typing.Callable = None, + *, + name: str = None, + accept_type: type = None, + compute_schema_fn: ComputeSchemaType = None, + needed_columns: dict[str, TypeId] = None): + """ + Decorator for wrapping a function as a stage. The function must receive at least one argument, the first argument + must be the incoming message, and must return a value. + + It is required to use type annotations for the function parameters and return type, as this will be used + by the stage as the accept and output types. If the incoming message parameter has no type annotation, the stage + will be use `typing.Any` as the input type. If the return type has no type annotation, the stage will be set to + return the same type as the input type. + + When invoked the wrapped function will return a stage, any additional arguments passed in aside from the config, + will be bound to the wrapped function via `functools.partial`. + + Examples + -------- + + >>> @stage + ... def multiplier(message: MessageMeta, *, column: str, value: int | float) -> MessageMeta: + ... with message.mutable_dataframe() as df: + ... df[column] = df[column] * value + ... + ... return message + ... + >>> + + >>> pipe.add_stage(multiplier(config, column='v2', value=5)) + + >>> # This will fail since `column` is required but no default value is provided: + >>> pipe.add_stage(multiplier(config, value=5)) + """ + + if on_data_fn is None: + return functools.partial(stage, + name=name, + accept_type=accept_type, + compute_schema_fn=compute_schema_fn, + needed_columns=needed_columns) + + # Use wraps to ensure user's don't lose their function name and docstrinsgs, however we do want to override the + # annotations to reflect that the returned function requires a config and returns a stage + @functools.wraps(on_data_fn, assigned=('__module__', '__name__', '__qualname__', '__doc__')) + def wrapper(config: Config, **kwargs) -> WrappedFunctionStage: + nonlocal name + nonlocal accept_type + nonlocal compute_schema_fn + + if name is None: + name = _get_name_from_fn(on_data_fn) + + # Even if both accept_type and compute_schema_fn are provided, we should still need to inspect the function + # signature to verify it is callable with at least one argument + signature = inspect.signature(on_data_fn) + param_iter = iter(signature.parameters.values()) + + try: + first_param = next(param_iter) + accept_type = accept_type or first_param.annotation + if accept_type is signature.empty: + raise ValueError(f"{first_param.name} argument of {name} has no type annotation") + except StopIteration as e: + raise ValueError(f"Stage function {name} must have at least one parameter") from e + + if compute_schema_fn is None: # pylint: disable=used-before-assignment + return_type = signature.return_annotation + if return_type is signature.empty: + raise ValueError( + "Stage functions must have either a return type annotation or specify a compute_schema_fn") + + def compute_schema_fn(schema: StageSchema): + if return_type is typing.Any: + out_type = schema.input_schema.get_type() + else: + out_type = return_type + + schema.output_schema.set_type(out_type) + + _validate_keyword_arguments(name, signature, kwargs, param_iter=param_iter) + + bound_on_data_fn = functools.partial(on_data_fn, **kwargs) + + return WrappedFunctionStage(config=config, + name=name, + on_data_fn=bound_on_data_fn, + accept_type=accept_type, + compute_schema_fn=compute_schema_fn, + needed_columns=needed_columns) + + return wrapper diff --git a/tests/examples/developer_guide/test_pass_thru.py b/tests/examples/developer_guide/test_pass_thru.py index 876a54066e..f3c1f7c411 100644 --- a/tests/examples/developer_guide/test_pass_thru.py +++ b/tests/examples/developer_guide/test_pass_thru.py @@ -15,42 +15,44 @@ import os import types -import typing -import pandas as pd import pytest -import cudf - from _utils import TEST_DIRS from morpheus.config import Config from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.utils.type_aliases import DataFrameType def _check_pass_thru(config: Config, - filter_probs_df: typing.Union[pd.DataFrame, cudf.DataFrame], - pass_thru_stage_cls: SinglePortStage): + filter_probs_df: DataFrameType, + pass_thru_stage_cls: SinglePortStage, + on_data_fn_name: str = 'on_data'): stage = pass_thru_stage_cls(config) + assert isinstance(stage, SinglePortStage) meta = MessageMeta(filter_probs_df) multi = MultiMessage(meta=meta) - assert stage.on_data(multi) is multi + on_data_fn = getattr(stage, on_data_fn_name) + assert on_data_fn(multi) is multi @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'developer_guide/1_simple_python_stage/pass_thru.py')) -def test_pass_thru_ex1(config: Config, - filter_probs_df: typing.Union[pd.DataFrame, cudf.DataFrame], - import_mod: types.ModuleType): +def test_pass_thru_ex1(config: Config, filter_probs_df: DataFrameType, import_mod: types.ModuleType): pass_thru = import_mod _check_pass_thru(config, filter_probs_df, pass_thru.PassThruStage) +@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'developer_guide/1_simple_python_stage/pass_thru_deco.py')) +def test_pass_thru_ex1_deco(config: Config, filter_probs_df: DataFrameType, import_mod: types.ModuleType): + pass_thru = import_mod + _check_pass_thru(config, filter_probs_df, pass_thru.pass_thru_stage, on_data_fn_name='_on_data_fn') + + @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'developer_guide/3_simple_cpp_stage/pass_thru.py')) -def test_pass_thru_ex3(config: Config, - filter_probs_df: typing.Union[pd.DataFrame, cudf.DataFrame], - import_mod: types.ModuleType): +def test_pass_thru_ex3(config: Config, filter_probs_df: DataFrameType, import_mod: types.ModuleType): pass_thru = import_mod _check_pass_thru(config, filter_probs_df, pass_thru.PassThruStage) diff --git a/tests/examples/developer_guide/test_recipient_features_stage.py b/tests/examples/developer_guide/test_recipient_features_stage.py index 2835c90fae..58fa06cc58 100644 --- a/tests/examples/developer_guide/test_recipient_features_stage.py +++ b/tests/examples/developer_guide/test_recipient_features_stage.py @@ -24,22 +24,20 @@ from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import MessageMeta +from morpheus.pipeline.single_port_stage import SinglePortStage EXPECTED_NEW_COLS = ['to_count', 'bcc_count', 'cc_count', 'total_recipients', 'data'] -@pytest.mark.import_mod( - [os.path.join(TEST_DIRS.examples_dir, 'developer_guide/2_1_real_world_phishing/recipient_features_stage.py')]) -def test_recipient_features_stage_on_data(config: Config, - dataset: DatasetManager, - import_mod: typing.List[types.ModuleType]): - recipient_features_stage = import_mod[0] - +def _test_on_data(config: Config, + dataset: DatasetManager, + stage_cls: SinglePortStage, + on_data_fn_name: str = 'on_data'): input_df = dataset[os.path.join(TEST_DIRS.tests_data_dir, 'examples/developer_guide/email_with_addresses_first_10.jsonlines')] config.mode = PipelineModes.NLP - stage = recipient_features_stage.RecipientFeaturesStage(config) + stage = stage_cls(config) meta = MessageMeta(input_df) @@ -48,7 +46,8 @@ def test_recipient_features_stage_on_data(config: Config, for col in EXPECTED_NEW_COLS: assert col not in input_df - results = stage.on_data(meta) + on_data_fn = getattr(stage, on_data_fn_name) + results = on_data_fn(meta) assert results is meta for col in EXPECTED_NEW_COLS: @@ -57,11 +56,39 @@ def test_recipient_features_stage_on_data(config: Config, @pytest.mark.import_mod( [os.path.join(TEST_DIRS.examples_dir, 'developer_guide/2_1_real_world_phishing/recipient_features_stage.py')]) -def test_recipient_features_stage_needed_columns(config: Config, import_mod: typing.List[types.ModuleType]): +def test_stage_class_on_data(config: Config, dataset: DatasetManager, import_mod: typing.List[types.ModuleType]): recipient_features_stage = import_mod[0] + _test_on_data(config, dataset, recipient_features_stage.RecipientFeaturesStage) + + +@pytest.mark.import_mod( + [os.path.join(TEST_DIRS.examples_dir, 'developer_guide/2_1_real_world_phishing/recipient_features_stage_deco.py')]) +def test_stage_decorator_on_data(config: Config, dataset: DatasetManager, import_mod: typing.List[types.ModuleType]): + recipient_features_stage_deco = import_mod[0] + _test_on_data(config, + dataset, + recipient_features_stage_deco.recipient_features_stage, + on_data_fn_name='_on_data_fn') + + +def _test_needed_columns(config: Config, stage_cls: SinglePortStage): config.mode = PipelineModes.NLP - stage = recipient_features_stage.RecipientFeaturesStage(config) + stage = stage_cls(config) needed_cols = stage.get_needed_columns() assert sorted(EXPECTED_NEW_COLS) == sorted(needed_cols.keys()) + + +@pytest.mark.import_mod( + [os.path.join(TEST_DIRS.examples_dir, 'developer_guide/2_1_real_world_phishing/recipient_features_stage.py')]) +def test_stage_class_needed_columns(config: Config, import_mod: typing.List[types.ModuleType]): + recipient_features_stage = import_mod[0] + _test_needed_columns(config, recipient_features_stage.RecipientFeaturesStage) + + +@pytest.mark.import_mod( + [os.path.join(TEST_DIRS.examples_dir, 'developer_guide/2_1_real_world_phishing/recipient_features_stage_deco.py')]) +def test_stage_decorator_needed_columns(config: Config, import_mod: typing.List[types.ModuleType]): + recipient_features_stage_deco = import_mod[0] + _test_needed_columns(config, recipient_features_stage_deco.recipient_features_stage) diff --git a/tests/pipeline/test_stage_decorator.py b/tests/pipeline/test_stage_decorator.py new file mode 100644 index 0000000000..8f8b819d03 --- /dev/null +++ b/tests/pipeline/test_stage_decorator.py @@ -0,0 +1,444 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import functools +import typing +from unittest import mock + +import pandas as pd +import pytest + +import cudf + +from _utils import assert_results +from morpheus.common import TypeId +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage +from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import ComputeSchemaType +from morpheus.pipeline.stage_decorator import PreAllocatedWrappedFunctionStage +from morpheus.pipeline.stage_decorator import WrappedFunctionSourceStage +from morpheus.pipeline.stage_decorator import WrappedFunctionStage +from morpheus.pipeline.stage_decorator import source +from morpheus.pipeline.stage_decorator import stage +from morpheus.pipeline.stage_schema import StageSchema +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage + + +def _get_annotation(type_: type, generator_type: type) -> type: + if generator_type is not None: + if generator_type in (typing.Generator, collections.abc.Generator): + annotation = generator_type[type_, None, None] + else: + annotation = generator_type[type_] + else: + annotation = type_ + + return annotation + + +def _mk_compute_schema_fn(return_type: type) -> ComputeSchemaType: + return lambda schema: schema.output_schema.set_type(return_type) + + +@pytest.mark.use_python +@pytest.mark.parametrize("generator_type", + [None, typing.Iterator, typing.Generator, collections.abc.Iterator, collections.abc.Generator]) +@pytest.mark.parametrize("return_type, is_prealloc", + [(pd.DataFrame, True), (cudf.DataFrame, True), (MessageMeta, True), (MultiMessage, True), + (float, False)]) +def test_wrapped_function_source_stage_constructor(config: Config, + generator_type: type, + return_type: type, + is_prealloc: bool): + return_annotation = _get_annotation(return_type, generator_type) + + def test_source_gen() -> return_annotation: + yield None + + if is_prealloc: + source_cls = PreAllocatedWrappedFunctionStage + else: + source_cls = WrappedFunctionSourceStage + + mock_compute_schema_fn = mock.MagicMock() + mock_compute_schema_fn.side_effect = _mk_compute_schema_fn(return_type) + + source_stage = source_cls(config, + name="unittest-source", + gen_fn=test_source_gen, + compute_schema_fn=mock_compute_schema_fn) + + assert isinstance(source_stage, WrappedFunctionSourceStage) + assert is_prealloc == isinstance(source_stage, PreAllocatedWrappedFunctionStage) + + # check the output type + schema = StageSchema(source_stage) + source_stage.compute_schema(schema) + assert schema.output_schema.get_type() is return_type + mock_compute_schema_fn.assert_called_once_with(schema) + + +@pytest.mark.use_python +@pytest.mark.parametrize("src_cls", [WrappedFunctionSourceStage, PreAllocatedWrappedFunctionStage]) +def test_wrapped_function_source_stage_not_generator_error(config: Config, src_cls: type): + + def test_source_gen() -> MessageMeta: + return MessageMeta(cudf.DataFrame()) + + with pytest.raises(ValueError): + src_cls(config, + name="unittest-source", + gen_fn=test_source_gen, + compute_schema_fn=_mk_compute_schema_fn(MessageMeta)) + + +@pytest.mark.use_python +@pytest.mark.parametrize("generator_type", + [None, typing.Iterator, typing.Generator, collections.abc.Iterator, collections.abc.Generator]) +@pytest.mark.parametrize("return_type, is_prealloc", + [(pd.DataFrame, True), (cudf.DataFrame, True), (MessageMeta, True), (MultiMessage, True), + (float, False)]) +def test_source_decorator(config: Config, generator_type: type, return_type: type, is_prealloc: bool): + return_annotation = _get_annotation(return_type, generator_type) + + @source + def test_source_gen() -> return_annotation: + yield None + + source_stage = test_source_gen(config) # pylint: disable=too-many-function-args + + assert isinstance(source_stage, WrappedFunctionSourceStage) + assert is_prealloc == isinstance(source_stage, PreAllocatedWrappedFunctionStage) + + # check the output type + schema = StageSchema(source_stage) + source_stage.compute_schema(schema) # pylint: disable=no-member + assert schema.output_schema.get_type() is return_type + + +@pytest.mark.use_python +def test_source_decorator_name(config: Config): + + @source + def test_source_gen(value: int) -> int: + yield value + + source_stage = test_source_gen(config, value=5) # pylint: disable=redundant-keyword-arg + assert source_stage.name == 'test_source_gen' # pylint: disable=no-member + + +@pytest.mark.use_python +def test_source_decorator_explicit_name(config: Config): + + @source(name="source_gen") + def test_source_gen(value: int) -> int: + yield value + + source_stage = test_source_gen(config, value=5) # pylint: disable=redundant-keyword-arg + assert source_stage.name == 'source_gen' # pylint: disable=no-member + + +@pytest.mark.use_python +def test_source_decorator_explicit_compute_schema(config: Config): + mock_compute_schema_fn = mock.MagicMock() + mock_compute_schema_fn.side_effect = _mk_compute_schema_fn(int) + + @source(compute_schema_fn=mock_compute_schema_fn) + def test_source_gen(value: int) -> int: + yield value + + source_stage = test_source_gen(config, value=5) # pylint: disable=redundant-keyword-arg + schema = StageSchema(source_stage) + source_stage.compute_schema(schema) # pylint: disable=no-member + assert schema.output_schema.get_type() is int + mock_compute_schema_fn.assert_called_once_with(schema) + + +@pytest.mark.use_python +def test_source_decorator_no_annoation_error(config: Config): + + @source + def test_source_gen(): + yield None + + with pytest.raises(ValueError): + test_source_gen(config) # pylint: disable=too-many-function-args + + +@pytest.mark.use_python +def test_not_generator_error(config: Config): + + @source + def test_fn() -> int: + return 5 + + with pytest.raises(ValueError): + test_fn(config) # pylint: disable=too-many-function-args + + +@pytest.mark.use_python +def test_source_stage_arg_no_value_error(config: Config): + + @source + def test_source_gen(value: int) -> int: + yield value + + with pytest.raises(ValueError): + test_source_gen(config) + + +@pytest.mark.use_python +@pytest.mark.parametrize("accept_type, return_type", + [(pd.DataFrame, MessageMeta), (int, int), (MessageMeta, MessageMeta), (typing.Any, bool), + (typing.Union[float, int], float), (float, typing.Any), (typing.Any, float), + (typing.Any, typing.Any)]) +def test_wrapped_function_stage_constructor(config: Config, accept_type: type, return_type: type): + wrapped_stage = WrappedFunctionStage(config, + name="unittest-stage", + on_data_fn=lambda x: x, + accept_type=accept_type, + compute_schema_fn=_mk_compute_schema_fn(return_type)) + + assert isinstance(wrapped_stage, WrappedFunctionStage) + assert wrapped_stage.accepted_types() == (accept_type, ) + + +@pytest.mark.use_python +@pytest.mark.parametrize("accept_type, return_type", + [(pd.DataFrame, MessageMeta), (int, int), (MessageMeta, MessageMeta), (typing.Any, bool), + (typing.Union[float, int], float), (float, float), (typing.Any, float), + (typing.Any, typing.Any)]) +def test_wrapped_function_stage_output_types(config: Config, accept_type: type, return_type: type): + # For non-source types we need an upstream before we can check the compute_schema method outside of a pipeline + + mock_compute_schema_fn = mock.MagicMock() + mock_compute_schema_fn.side_effect = _mk_compute_schema_fn(return_type) + + wrapped_stage = WrappedFunctionStage(config, + name="unittest-stage", + on_data_fn=lambda x: x, + accept_type=accept_type, + compute_schema_fn=mock_compute_schema_fn) + + def source_fn(): + yield None + + upstream = WrappedFunctionSourceStage(config, + name="source_fn", + gen_fn=source_fn, + compute_schema_fn=_mk_compute_schema_fn(accept_type)) + + pipe = LinearPipeline(config) + pipe.set_source(upstream) + pipe.add_stage(wrapped_stage) + pipe.build() + mock_compute_schema_fn.assert_called_once() # pipe.build() will call wrapped_stage.compute_schema() + + schema = StageSchema(wrapped_stage) + wrapped_stage.compute_schema(schema) + assert schema.output_schema.get_type() is return_type + + +@pytest.mark.use_python +def test_wrapped_function_stage_name(config: Config): + + def multiplier(message: MessageMeta, column: str, value: int | float) -> MessageMeta: + with message.mutable_dataframe() as df: + df[column] = df[column] * value + + return message + + wrapped_stage = WrappedFunctionStage(config, + name="multiplier", + on_data_fn=functools.partial(multiplier, column='v2', value=5), + accept_type=MessageMeta, + compute_schema_fn=_mk_compute_schema_fn(MessageMeta)) + assert wrapped_stage.name == 'multiplier' + + +@pytest.mark.use_python +@pytest.mark.parametrize("needed_columns", + [None, { + 'result': TypeId.INT64 + }, { + 'a': TypeId.INT64, 'b': TypeId.FLOAT32, 'c': TypeId.STRING + }]) +def test_wrapped_function_stage_needed_columns(config: Config, needed_columns: dict[str, TypeId]): + + def test_fn(message: MessageMeta) -> MessageMeta: + return message + + wrapped_stage = WrappedFunctionStage(config, + name="unittest-stage", + on_data_fn=test_fn, + accept_type=MessageMeta, + compute_schema_fn=_mk_compute_schema_fn(MessageMeta), + needed_columns=needed_columns) + expected_needed_columns = needed_columns or collections.OrderedDict() + assert wrapped_stage._needed_columns == expected_needed_columns + + +@pytest.mark.use_python +@pytest.mark.parametrize("use_accept_type_annotation", [True, False]) +@pytest.mark.parametrize("accept_type, return_type", + [(pd.DataFrame, MessageMeta), (int, int), (MessageMeta, MessageMeta), (typing.Any, bool), + (typing.Union[float, int], float), (float, typing.Any), (typing.Any, float), + (typing.Any, typing.Any)]) +def test_stage_decorator(config: Config, accept_type: type, return_type: type, use_accept_type_annotation: bool): + + if use_accept_type_annotation: + + @stage + def test_fn(message: accept_type) -> return_type: + return message + else: + + @stage(accept_type=accept_type) + def test_fn(message) -> return_type: + return message + + wrapped_stage = test_fn(config) + + assert isinstance(wrapped_stage, WrappedFunctionStage) + assert wrapped_stage.accepted_types() == (accept_type, ) + + +@pytest.mark.use_python +@pytest.mark.parametrize("name", [None, "unittest-stage"]) +def test_stage_decorator_name(config: Config, name: str): + if name is None: + expected_name = 'test_fn' + else: + expected_name = name + + @stage(name=name) + def test_fn(message: float, value: float) -> float: + return message * value + + wrapped_stage = test_fn(config, value=2.2) + assert wrapped_stage.name == expected_name + + +@pytest.mark.use_python +@pytest.mark.parametrize("explicit_compute_schema_fn", [True, False]) +@pytest.mark.parametrize("accept_type, return_type", + [(pd.DataFrame, MessageMeta), (int, int), (MessageMeta, MessageMeta), (typing.Any, bool), + (typing.Union[float, int], float), (float, float), (typing.Any, float), + (typing.Any, typing.Any)]) +def test_stage_decorator_output_types(config: Config, + accept_type: type, + return_type: type, + explicit_compute_schema_fn: bool): + # For non-source types we need an upstream before we can check the compute_schema method outside of a pipeline + @source + def source_fn() -> accept_type: + yield None + + if explicit_compute_schema_fn: + mock_compute_schema_fn = mock.MagicMock() + mock_compute_schema_fn.side_effect = _mk_compute_schema_fn(return_type) + + @stage(compute_schema_fn=mock_compute_schema_fn) + def test_stage(message: accept_type) -> return_type: + return message + else: + + @stage + def test_stage(message: accept_type) -> return_type: + return message + + pipe = LinearPipeline(config) + pipe.set_source(source_fn(config)) # pylint: disable=too-many-function-args + wrapped_stage = pipe.add_stage(test_stage(config)) + pipe.build() + + if explicit_compute_schema_fn: + mock_compute_schema_fn.assert_called_once() # pipe.build() will call wrapped_stage.compute_schema() + + schema = StageSchema(wrapped_stage) + wrapped_stage.compute_schema(schema) + assert schema.output_schema.get_type() is return_type + + +@pytest.mark.use_python +def test_stage_decorator_no_annotation_error(config: Config): + + @stage + def test_fn(message): + return message + + with pytest.raises(ValueError): + test_fn(config) + + +@pytest.mark.use_python +def test_stage_arg_no_value_error(config: Config): + + @stage + def test_fn(message: float, value: float) -> float: + return message * value + + with pytest.raises(ValueError): + test_fn(config) # pylint: disable=no-value-for-parameter + + +@pytest.mark.use_python +@pytest.mark.parametrize("needed_columns", + [None, { + 'result': TypeId.INT64 + }, { + 'a': TypeId.INT64, 'b': TypeId.FLOAT32, 'c': TypeId.STRING + }]) +def test_stage_decorator_needed_columns(config: Config, needed_columns: dict[str, TypeId]): + + @stage(needed_columns=needed_columns) + def test_fn(message: MessageMeta) -> MessageMeta: + return message + + wrapped_stage = test_fn(config) + expected_needed_columns = needed_columns or collections.OrderedDict() + assert wrapped_stage._needed_columns == expected_needed_columns + + +def test_end_to_end_pipe(config: Config, filter_probs_df: cudf.DataFrame): + + @source + def source_gen(dataframes: list[cudf.DataFrame]) -> collections.abc.Iterator[MessageMeta]: + for df in dataframes: + yield MessageMeta(df) + + @stage + def multiplier(message: MessageMeta, column: str, value: int | float = 2.0) -> MessageMeta: + with message.mutable_dataframe() as df: + df[column] = df[column] * value + + return message + + multipy_by = 5 + expected_df = filter_probs_df.copy(deep=True) + expected_df['v2'] = expected_df['v2'] * multipy_by * 2.0 + + pipe = LinearPipeline(config) + pipe.set_source(source_gen(config, dataframes=[filter_probs_df])) # pylint: disable=redundant-keyword-arg + pipe.add_stage(multiplier(config, column='v2', value=multipy_by)) + pipe.add_stage(multiplier(config, column='v2')) + sink = pipe.add_stage(CompareDataFrameStage(config, expected_df)) + pipe.run() + + assert_results(sink.get_results())