Skip to content

Commit

Permalink
Fix logging of wrong feature_columns in CLI and exception in auto-com…
Browse files Browse the repository at this point in the history
…plete (#983)

The CLI command for the FIL pipeline set a default value for `--columns_file` flag but some FIL pipelines like ransomeware didn't use this config property, causing the default columns file to be loaded and for the values to be logged to the terminal.

* Removed default values for `--columns_file` and `--labels-file` flags
* Removed redundant checks for null values if they were already being enforced by `required=True`
* Fix raised exception when using click auto-complete
* Misc pylint/flake8 errors

fixes #984
fixes #819

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - Pete MacKinnon (https://github.com/pdmack)

URL: #983
  • Loading branch information
dagardner-nv authored Jun 23, 2023
1 parent 8472c85 commit 5990f3a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 59 deletions.
4 changes: 2 additions & 2 deletions docs/source/cloud_deployment_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ helm install --set ngc.apiKey="$API_KEY" \
--pipeline_batch_size=1024 \
--model_max_batch_size=64 \
--use_cpp=True \
pipeline-fil \
pipeline-fil --columns_file=data/columns_fil.txt \
from-file --filename=./examples/data/nvsmi.jsonlines \
monitor --description 'FromFile Rate' --smoothing=0.001 \
deserialize \
Expand All @@ -661,7 +661,7 @@ helm install --set ngc.apiKey="$API_KEY" \
--pipeline_batch_size=1024 \
--model_max_batch_size=64 \
--use_cpp=True \
pipeline-fil \
pipeline-fil --columns_file=data/columns_fil.txt \
from-kafka --input_topic <YOUR_INPUT_KAFKA_TOPIC> --bootstrap_servers broker:9092 \
monitor --description 'FromKafka Rate' --smoothing=0.001 \
deserialize \
Expand Down
2 changes: 1 addition & 1 deletion examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ morpheus --log_level=DEBUG \
`# Run a pipeline with 8 threads and a model batch size of 32 (Must be equal or less than Triton config)` \
run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=1024 \
`# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \
pipeline-fil \
pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \
`# 1st Stage: Read from file` \
from-file --filename=examples/data/nvsmi.jsonlines \
`# 2nd Stage: Deserialize from JSON strings to objects` \
Expand Down
66 changes: 30 additions & 36 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline and tool subcommands for the Morpheus CLI."""

import functools
import logging
Expand Down Expand Up @@ -64,14 +65,17 @@
# `modes` is a tuple so it can be cached for LRU to work
@functools.lru_cache(maxsize=None)
def add_command(name: str, stage_module: str, modes: typing.Tuple[PipelineModes, ...] = None):
"""Add a stage to the global registry."""

GlobalStageRegistry.get().add_stage_info(
LazyStageInfo(name=name, stage_qualified_name=stage_module, modes=list(modes)))


class AliasedGroup(click.Group):
"""A click group that supports aliases."""

def get_command(self, ctx, cmd_name):
"""Resolve aliases before calling the base method."""
try:
cmd_name = ALIASES[cmd_name]
except KeyError:
Expand All @@ -80,6 +84,7 @@ def get_command(self, ctx, cmd_name):


class PluginGroup(AliasedGroup):
"""Click group that loads subcommands from plugins."""

def __init__(
self,
Expand All @@ -96,28 +101,31 @@ def __init__(
self._plugin_manager = PluginManager.get()

def list_commands(self, ctx: click.Context) -> typing.List[str]:
"""Get the list of commands."""

# Get the list of commands from the base
command_list = super().list_commands(ctx)
command_list = set(super().list_commands(ctx))

# Extend it with any plugins
registered_stages = self._plugin_manager.get_registered_stages()

plugin_command_list = registered_stages.get_registered_names(self._pipeline_mode)

duplicate_commands = [x for x in plugin_command_list if x in command_list]
duplicate_commands = command_list.intersection(plugin_command_list)

if (len(duplicate_commands) > 0):
raise RuntimeError("Plugins registered the following duplicate commands: {}".format(
", ".join(duplicate_commands)))
# The COMP_WORDS environment variable is set by click using the auto-complete feature, which may cause this
# metho to be called multiple times.
if (len(duplicate_commands) > 0 and 'COMP_WORDS' not in os.environ):
raise RuntimeError(f"Plugins registered the following duplicate commands: {', '.join(duplicate_commands)}")

command_list.extend(plugin_command_list)
command_list.update(plugin_command_list)

command_list = sorted(command_list)

return command_list

def get_command(self, ctx, cmd_name):
"""Get the command."""

# Check if the command is already loaded
if (cmd_name not in self.commands):
Expand Down Expand Up @@ -162,6 +170,7 @@ def cli(ctx: click.Context,
log_config_file: str = DEFAULT_CONFIG.log_config_file,
plugins: typing.List[str] = None,
**kwargs):
"""Main entry point function for the CLI."""

# ensure that ctx.obj exists and is a dict (in case `cli()` is called
# by means other than the `if` block below
Expand All @@ -185,7 +194,7 @@ def cli(ctx: click.Context,
@cli.group(short_help="Run a utility tool", no_args_is_help=True)
@prepare_command()
def tools(ctx: click.Context, **kwargs):

"""Tools subcommand"""
pass


Expand All @@ -197,17 +206,17 @@ def tools(ctx: click.Context, **kwargs):
@click.option('--max_workspace_size', type=int, default=16000)
@prepare_command()
def onnx_to_trt(ctx: click.Context, **kwargs):

"""Converts an ONNX model to a TRT engine"""
logger.info("Generating onnx file")

# Convert batches to a list
kwargs["batches"] = list(kwargs["batches"])

c = ConfigOnnxToTRT()

for param in kwargs:
for (param, val) in kwargs.items():
if hasattr(c, param):
setattr(c, param, kwargs[param])
setattr(c, param, val)

from morpheus.utils.onnx_to_trt import gen_engine

Expand All @@ -216,6 +225,7 @@ def onnx_to_trt(ctx: click.Context, **kwargs):

@tools.group(short_help="Utility for installing/updating/removing shell completion for Morpheus", no_args_is_help=True)
def autocomplete(**kwargs):
"""Utility for installing/updating/removing shell completion for Morpheus"""
pass


Expand All @@ -229,7 +239,7 @@ def show(shell):
from morpheus.cli import click_completion_tools
shell, path, code = click_completion_tools.get_code(shell=shell)

click.secho("To add %s completion, write the following code to '%s':\n" % (shell, path), fg="blue")
click.secho(f"To add {shell} completion, write the following code to '{path}':\n", fg="blue")
click.echo(code)


Expand All @@ -247,7 +257,7 @@ def install(**kwargs):
from morpheus.cli import click_completion_tools
shell, path = click_completion_tools.install_code(**kwargs)

click.echo('%s completion installed in %s' % (shell, path))
click.echo(f'{shell} completion installed in {path}')


@cli.group(short_help="Run one of the available pipelines", no_args_is_help=True, cls=AliasedGroup)
Expand Down Expand Up @@ -277,7 +287,7 @@ def install(**kwargs):
"Only use as a last resort if bugs are encountered"))
@prepare_command(parse_config=True)
def run(ctx: click.Context, **kwargs):

"""Run subcommand, used for running a pipeline"""
# Since the option isnt the same name as `should_use_cpp` anymore, manually set the value here.
CppConfig.set_should_use_cpp(kwargs.pop("use_cpp", CppConfig.get_should_use_cpp()))

Expand Down Expand Up @@ -314,11 +324,9 @@ def pipeline_nlp(ctx: click.Context, **kwargs):
output of each stage will become the input for the next stage. For example, to read, classify and write to a file,
the following stages could be used
\b
pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model
--server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json
\b
Pipelines must follow a few rules:
1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
Expand Down Expand Up @@ -368,7 +376,6 @@ def pipeline_nlp(ctx: click.Context, **kwargs):
"A label file is a simple text file where each line corresponds to a label. "
"If unspecified the value specified by the --label flag will be used."))
@click.option('--columns_file',
default="data/columns_fil.txt",
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read column features."))
@click.option('--viz_file',
Expand All @@ -382,11 +389,9 @@ def pipeline_fil(ctx: click.Context, **kwargs):
output of each stage will become the input for the next stage. For example, to read, classify and write to a file,
the following stages could be used
\b
pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model
--server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json
\b
Pipelines must follow a few rules:
1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
Expand All @@ -413,8 +418,6 @@ def pipeline_fil(ctx: click.Context, **kwargs):
if ("columns_file" in kwargs and kwargs["columns_file"] is not None):
config.fil.feature_columns = load_labels_file(kwargs["columns_file"])
logger.debug("Loaded columns. Current columns: [%s]", str(config.fil.feature_columns))
else:
raise ValueError('Unable to find columns file')

from morpheus.pipeline import LinearPipeline

Expand All @@ -430,15 +433,14 @@ def pipeline_fil(ctx: click.Context, **kwargs):
pipeline_mode=PipelineModes.AE)
@click.option('--columns_file',
required=True,
default="data/columns_ae.txt",
default=None,
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read column features."))
@click.option('--labels_file',
default=None,
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read labels from in order to convert class IDs into labels. "
"A label file is a simple text file where each line corresponds to a label. "
"If unspecified, only a single output label is created for FIL"))
"A label file is a simple text file where each line corresponds to a label. "))
@click.option('--userid_column_name',
type=str,
default="userIdentityaccountId",
Expand Down Expand Up @@ -469,11 +471,9 @@ def pipeline_ae(ctx: click.Context, **kwargs):
output of each stage will become the input for the next stage. For example, to read, classify and write to a file,
the following stages could be used
\b
pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model
--server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json
\b
Pipelines must follow a few rules:
1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
Expand All @@ -495,19 +495,14 @@ def pipeline_ae(ctx: click.Context, **kwargs):
config.ae.userid_column_name = kwargs["userid_column_name"]
config.ae.feature_scaler = kwargs["feature_scaler"]
config.ae.use_generic_model = kwargs["use_generic_model"]

if ("columns_file" in kwargs and kwargs["columns_file"] is not None):
config.ae.feature_columns = load_labels_file(kwargs["columns_file"])
logger.debug("Loaded columns. Current columns: [%s]", str(config.ae.feature_columns))
else:
# Use a default single label
config.class_labels = ["reconstruct_loss", "zscore"]
config.ae.feature_columns = load_labels_file(kwargs["columns_file"])
logger.debug("Loaded columns. Current columns: [%s]", str(config.ae.feature_columns))

if ("labels_file" in kwargs and kwargs["labels_file"] is not None):
config.class_labels = load_labels_file(kwargs["labels_file"])
logger.debug("Loaded labels file. Current labels: [%s]", str(config.class_labels))
else:
# Use a default single label
# Use default labels
config.class_labels = ["reconstruct_loss", "zscore"]

if ("userid_filter" in kwargs):
Expand Down Expand Up @@ -552,11 +547,9 @@ def pipeline_other(ctx: click.Context, **kwargs):
output of each stage will become the input for the next stage. For example, to read, classify and write to a file,
the following stages could be used
\b
pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model
--server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json
\b
Pipelines must follow a few rules:
1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
Expand Down Expand Up @@ -595,6 +588,7 @@ def pipeline_other(ctx: click.Context, **kwargs):
@pipeline_other.result_callback()
@click.pass_context
def post_pipeline(ctx: click.Context, *args, **kwargs):
"""Executes the pipeline"""

config = get_config_from_ctx(ctx)

Expand All @@ -611,7 +605,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
# TODO(MDD): Move visualization before `pipeline.run()` once Issue #230 is fixed.
if ("viz_file" in kwargs and kwargs["viz_file"] is not None):
pipeline.visualize(kwargs["viz_file"], rankdir="LR")
click.secho("Pipeline visualization saved to {}".format(kwargs["viz_file"]), fg="yellow")
click.secho(f"Pipeline visualization saved to {kwargs['viz_file']}", fg="yellow")


# Manually create the subcommands for each command (necessary since commands can be used on multiple groups)
Expand Down
2 changes: 1 addition & 1 deletion scripts/validation/val-run-pipeline.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function run_pipeline_abp_nvsmi(){
VAL_OUTPUT=$5

morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \
pipeline-fil \
pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \
from-file --filename=${INPUT_FILE} \
deserialize \
preprocess \
Expand Down
40 changes: 21 additions & 19 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ def test_pipeline_fil(self, config, callback_values):
"""
Creates a pipeline roughly matching that of the abp validation test
"""
args = (GENERAL_ARGS + ['pipeline-fil'] + FILE_SRC_ARGS + ['deserialize', 'preprocess'] + INF_TRITON_ARGS +
MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS)
args = (GENERAL_ARGS + ['pipeline-fil', '--columns_file=data/columns_fil.txt'] + FILE_SRC_ARGS +
['deserialize', 'preprocess'] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS +
['serialize'] + TO_FILE_ARGS)

obj = {}
runner = CliRunner()
Expand Down Expand Up @@ -419,23 +420,24 @@ def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri):
with open(labels_file, 'w', encoding='UTF-8') as fh:
fh.writelines(['frogs\n', 'lizards\n', 'toads'])

args = (GENERAL_ARGS + ['pipeline-fil', '--labels_file', labels_file] + FILE_SRC_ARGS + FROM_KAFKA_ARGS + [
'deserialize',
'filter',
'dropna',
'--column',
'xyz',
'preprocess',
'add-scores',
'unittest-conv-msg',
'inf-identity',
'inf-pytorch',
'--model_filename',
tmp_model,
'mlflow-drift',
'--tracking_uri',
mlflow_uri
] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS +
args = (GENERAL_ARGS + ['pipeline-fil', '--labels_file', labels_file, '--columns_file=data/columns_fil.txt'] +
FILE_SRC_ARGS + FROM_KAFKA_ARGS + [
'deserialize',
'filter',
'dropna',
'--column',
'xyz',
'preprocess',
'add-scores',
'unittest-conv-msg',
'inf-identity',
'inf-pytorch',
'--model_filename',
tmp_model,
'mlflow-drift',
'--tracking_uri',
mlflow_uri
] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS +
TO_KAFKA_ARGS)

obj = {}
Expand Down

0 comments on commit 5990f3a

Please sign in to comment.