Skip to content

Commit

Permalink
Add source & stage decorators (#1364)
Browse files Browse the repository at this point in the history
* Adds two new decorators `source` and `stage` allowing stages to be created from python functions.
* Accept & return types are inferred by inspecting the type annotations of the decorated function.
* This removes the need for several lines of boilerplate code needed for simple Python-only stages.
* Includes unrelated docstring fix to `morpheus/pipeline/stage_base.py`

example:
```py
from morpheus.pipeline.stage_decorator import source
from morpheus.pipeline.stage_decorator import stage

@source
def source_gen(dataframes: list[cudf.DataFrame]) -> collections.abc.Iterator[MessageMeta]:
    for df in dataframes:
        yield MessageMeta(df)


@stage
def on_data(message: MessageMeta) -> MessageMeta:
    with message.mutable_dataframe() as df:
        print(df)
    return message
```
 
Closes #1253

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1364
  • Loading branch information
dagardner-nv authored Nov 29, 2023
1 parent ea188a1 commit ba28cff
Show file tree
Hide file tree
Showing 17 changed files with 1,441 additions and 147 deletions.
125 changes: 103 additions & 22 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,62 @@ Morpheus makes use of the MRC graph-execution framework. Morpheus pipelines are

## The Pass Through Stage

To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed.
To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed. Morpheus provides two ways of defining a stage, as a stand-alone function or as a class.

Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class.
### Stand-alone Function

The stand-alone function approach is the simplest way to define a stage. The function should accept a single argument, which will be the input message, and return a single value, which will be the output message. The function should be decorated with the `morpheus.pipeline.stage_decorator.stage` decorator.

```python
import typing

from morpheus.pipeline.stage_decorator import stage


@stage
def pass_thru_stage(message: typing.Any) -> typing.Any:
# Return the message for the next stage
return message
```

When using the `stage` decorator it is required to use type annotations for the function parameters and return type, as this will be used by the stage as the accept and output types. In the above example the stage decorator will use the name of the function as the name of the stage. If we wanted to specify a different name for the stage we could do so by passing the name to the decorator as follows:

```python
@stage(name="pass-thru")
def pass_thru_stage(message: typing.Any) -> typing.Any:
# Return the message for the next stage
return message
```

We can then add our stage to a pipeline as follows:
```python
config = Config()
pipeline = LinearPipeline(config)
# ... rest of pipeline setup
pipeline.add_stage(pass_thru_stage(config))
```

It is possible to provide additional keyword arguments to the function. Consider the following example:
```python
@stage
def multiplier(message: MessageMeta, *, column: str, value: int | float = 2.0) -> MessageMeta:
with message.mutable_dataframe() as df:
df[column] = df[column] * value

return message

# ...

pipe.add_stage(multiplier(config, column='probs', value=5))
```

### Stage Class

The class based aproach to defining a stage offers a bit more flexibility, specifically the ability to validate constructor arguments, and perform any needed setup prior to being invoked in a pipeline. Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class.

Stages in Morpheus define what types of data they accept, and the type of data that they emit. In this example we are emitting messages of the same type that is received, this is actually quite common and Morpheus provides a mixin class, `PassThruTypeMixin`, to simplify this.

Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line.
Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line.

We start our class definition with a few basic imports:

Expand Down Expand Up @@ -155,17 +204,19 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage):
```

## Testing our new Stage
To start testing our new pass through stage, we are going to construct a simple pipeline and add our new stage to it. This pipeline will do the minimum work necessary to verify our pass through stage. Data will flow through our simple pipeline as follows:
To start testing both our new function-based and class-based stages, we are going to construct a simple pipeline and add both stages to it. This pipeline will do the minimum work necessary to verify our pass through stages. Data will flow through our simple pipeline as follows:
1. A source stage will produce data and emit it into the pipeline.
1. This data will be read and processed by our pass through stage, in this case simply forwarding on the data.
1. A monitoring stage will record the messages from our pass through stage and terminate the pipeline.

First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the `PassThruStage`` in a file named "pass_thru.py" in the same directory.
First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the class based `PassThruStage` in a file named "pass_thru.py" and the function based `pass_thru_stage` named "pass_thru_deco.py" in the same directory.

```python
import logging
import os

from pass_thru import PassThruStage
from pass_thru_deco import pass_thru_stage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
Expand All @@ -174,6 +225,7 @@ from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging
```


Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger:
```python
configure_logging(log_level=logging.DEBUG)
Expand All @@ -189,10 +241,19 @@ In this example, we will use the `FileSourceStage` class to read a large file in
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
```

Next, we will add our new stage to the pipeline as well as a `MonitorStage` which will measure the throughput of our pass through stage:
Next, we will add both versions of our new stage to the pipeline as well as a `MonitorStage` instance for each to measure the throughput of our stages:

```python
# Add the decorated function stage
pipeline.add_stage(pass_thru_stage(config))

# Add monitor to record the performance of the function based stage
pipeline.add_stage(MonitorStage(config))

# Add the class based stage
pipeline.add_stage(PassThruStage(config))

# Add monitor to record the performance of the class based stage
pipeline.add_stage(MonitorStage(config))
```

Expand All @@ -203,20 +264,30 @@ pipeline.run()

The output should display:
```
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True)>
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, parser_kwargs=None)>
└─> morpheus.MessageMeta
Added stage: <pass-thru-1; PassThruStage(args=(), kwargs={})>
Added stage: <pass_thru_stage-1; WrappedFunctionStage(on_data_fn=<function pass_thru_stage at 0x7f001f72bd00>, on_data_args=(), accept_type=None, return_type=None, needed_columns=None, on_data_kwargs={})>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
Added stage: <pass-thru-3; PassThruStage()>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1648834587.3092508
====Pipeline Started====
Progress[Complete]: 25229messages [00:00, 57695.02messages/s]
Added stage: <monitor-4; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Progress[Complete]: 100 messages [00:01, 69.97 messages/s]
Progress[Complete]: 100 messages [00:01, 69.76 messages/s]
====Pipeline Complete====
```

Expand All @@ -226,14 +297,15 @@ Note that this code assumes the `MORPHEUS_ROOT` environment variable is set to t
import logging
import os

from pass_thru import PassThruStage
from pass_thru_deco import pass_thru_stage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage


def run_pipeline():
# Enable the Morpheus logger
Expand All @@ -250,22 +322,31 @@ def run_pipeline():
# Set source stage
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

# Add our own stage
# Add the decorated function stage
pipeline.add_stage(pass_thru_stage(config))

# Add monitor to record the performance of the function based stage
pipeline.add_stage(MonitorStage(config))

# Add the class based stage
pipeline.add_stage(PassThruStage(config))

# Add monitor to record the performance of our new stage
# Add monitor to record the performance of the class based stage
pipeline.add_stage(MonitorStage(config))

# Run the pipeline
pipeline.run()


if __name__ == "__main__":
run_pipeline()
```


### Alternate Morpheus CLI example
The above example makes use of the Morpheus Python API. Alternately, we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool.
The above example makes use of the Morpheus Python API. Alternately, we could test the class-based stage in a pipeline constructed using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool.

> **Note**: For now, registering a stage with the CLI tool is currently only available to class based stages.
From the root of the Morpheus repo, run:
```bash
Expand Down
Loading

0 comments on commit ba28cff

Please sign in to comment.