From 5990f3ae2546696807e2a9e8adc6dac8dfce7109 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Fri, 23 Jun 2023 08:08:09 -0700 Subject: [PATCH] Fix logging of wrong feature_columns in CLI and exception in auto-complete (#983) The CLI command for the FIL pipeline set a default value for `--columns_file` flag but some FIL pipelines like ransomeware didn't use this config property, causing the default columns file to be loaded and for the values to be logged to the terminal. * Removed default values for `--columns_file` and `--labels-file` flags * Removed redundant checks for null values if they were already being enforced by `required=True` * Fix raised exception when using click auto-complete * Misc pylint/flake8 errors fixes #984 fixes #819 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) - Pete MacKinnon (https://github.com/pdmack) URL: https://github.com/nv-morpheus/Morpheus/pull/983 --- docs/source/cloud_deployment_guide.md | 4 +- examples/abp_nvsmi_detection/README.md | 2 +- morpheus/cli/commands.py | 66 ++++++++++++-------------- scripts/validation/val-run-pipeline.sh | 2 +- tests/test_cli.py | 40 ++++++++-------- 5 files changed, 55 insertions(+), 59 deletions(-) diff --git a/docs/source/cloud_deployment_guide.md b/docs/source/cloud_deployment_guide.md index cbb4bfa86d..d9a1f70caa 100644 --- a/docs/source/cloud_deployment_guide.md +++ b/docs/source/cloud_deployment_guide.md @@ -636,7 +636,7 @@ helm install --set ngc.apiKey="$API_KEY" \ --pipeline_batch_size=1024 \ --model_max_batch_size=64 \ --use_cpp=True \ - pipeline-fil \ + pipeline-fil --columns_file=data/columns_fil.txt \ from-file --filename=./examples/data/nvsmi.jsonlines \ monitor --description 'FromFile Rate' --smoothing=0.001 \ deserialize \ @@ -661,7 +661,7 @@ helm install --set ngc.apiKey="$API_KEY" \ --pipeline_batch_size=1024 \ --model_max_batch_size=64 \ --use_cpp=True \ - pipeline-fil \ + pipeline-fil --columns_file=data/columns_fil.txt \ from-kafka --input_topic --bootstrap_servers broker:9092 \ monitor --description 'FromKafka Rate' --smoothing=0.001 \ deserialize \ diff --git a/examples/abp_nvsmi_detection/README.md b/examples/abp_nvsmi_detection/README.md index 1141bf4ffd..693a12ed11 100644 --- a/examples/abp_nvsmi_detection/README.md +++ b/examples/abp_nvsmi_detection/README.md @@ -101,7 +101,7 @@ morpheus --log_level=DEBUG \ `# Run a pipeline with 8 threads and a model batch size of 32 (Must be equal or less than Triton config)` \ run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ `# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \ - pipeline-fil \ + pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \ `# 1st Stage: Read from file` \ from-file --filename=examples/data/nvsmi.jsonlines \ `# 2nd Stage: Deserialize from JSON strings to objects` \ diff --git a/morpheus/cli/commands.py b/morpheus/cli/commands.py index 3ab3557653..4fc580eff5 100644 --- a/morpheus/cli/commands.py +++ b/morpheus/cli/commands.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Pipeline and tool subcommands for the Morpheus CLI.""" import functools import logging @@ -64,14 +65,17 @@ # `modes` is a tuple so it can be cached for LRU to work @functools.lru_cache(maxsize=None) def add_command(name: str, stage_module: str, modes: typing.Tuple[PipelineModes, ...] = None): + """Add a stage to the global registry.""" GlobalStageRegistry.get().add_stage_info( LazyStageInfo(name=name, stage_qualified_name=stage_module, modes=list(modes))) class AliasedGroup(click.Group): + """A click group that supports aliases.""" def get_command(self, ctx, cmd_name): + """Resolve aliases before calling the base method.""" try: cmd_name = ALIASES[cmd_name] except KeyError: @@ -80,6 +84,7 @@ def get_command(self, ctx, cmd_name): class PluginGroup(AliasedGroup): + """Click group that loads subcommands from plugins.""" def __init__( self, @@ -96,28 +101,31 @@ def __init__( self._plugin_manager = PluginManager.get() def list_commands(self, ctx: click.Context) -> typing.List[str]: + """Get the list of commands.""" # Get the list of commands from the base - command_list = super().list_commands(ctx) + command_list = set(super().list_commands(ctx)) # Extend it with any plugins registered_stages = self._plugin_manager.get_registered_stages() plugin_command_list = registered_stages.get_registered_names(self._pipeline_mode) - duplicate_commands = [x for x in plugin_command_list if x in command_list] + duplicate_commands = command_list.intersection(plugin_command_list) - if (len(duplicate_commands) > 0): - raise RuntimeError("Plugins registered the following duplicate commands: {}".format( - ", ".join(duplicate_commands))) + # The COMP_WORDS environment variable is set by click using the auto-complete feature, which may cause this + # metho to be called multiple times. + if (len(duplicate_commands) > 0 and 'COMP_WORDS' not in os.environ): + raise RuntimeError(f"Plugins registered the following duplicate commands: {', '.join(duplicate_commands)}") - command_list.extend(plugin_command_list) + command_list.update(plugin_command_list) command_list = sorted(command_list) return command_list def get_command(self, ctx, cmd_name): + """Get the command.""" # Check if the command is already loaded if (cmd_name not in self.commands): @@ -162,6 +170,7 @@ def cli(ctx: click.Context, log_config_file: str = DEFAULT_CONFIG.log_config_file, plugins: typing.List[str] = None, **kwargs): + """Main entry point function for the CLI.""" # ensure that ctx.obj exists and is a dict (in case `cli()` is called # by means other than the `if` block below @@ -185,7 +194,7 @@ def cli(ctx: click.Context, @cli.group(short_help="Run a utility tool", no_args_is_help=True) @prepare_command() def tools(ctx: click.Context, **kwargs): - + """Tools subcommand""" pass @@ -197,7 +206,7 @@ def tools(ctx: click.Context, **kwargs): @click.option('--max_workspace_size', type=int, default=16000) @prepare_command() def onnx_to_trt(ctx: click.Context, **kwargs): - + """Converts an ONNX model to a TRT engine""" logger.info("Generating onnx file") # Convert batches to a list @@ -205,9 +214,9 @@ def onnx_to_trt(ctx: click.Context, **kwargs): c = ConfigOnnxToTRT() - for param in kwargs: + for (param, val) in kwargs.items(): if hasattr(c, param): - setattr(c, param, kwargs[param]) + setattr(c, param, val) from morpheus.utils.onnx_to_trt import gen_engine @@ -216,6 +225,7 @@ def onnx_to_trt(ctx: click.Context, **kwargs): @tools.group(short_help="Utility for installing/updating/removing shell completion for Morpheus", no_args_is_help=True) def autocomplete(**kwargs): + """Utility for installing/updating/removing shell completion for Morpheus""" pass @@ -229,7 +239,7 @@ def show(shell): from morpheus.cli import click_completion_tools shell, path, code = click_completion_tools.get_code(shell=shell) - click.secho("To add %s completion, write the following code to '%s':\n" % (shell, path), fg="blue") + click.secho(f"To add {shell} completion, write the following code to '{path}':\n", fg="blue") click.echo(code) @@ -247,7 +257,7 @@ def install(**kwargs): from morpheus.cli import click_completion_tools shell, path = click_completion_tools.install_code(**kwargs) - click.echo('%s completion installed in %s' % (shell, path)) + click.echo(f'{shell} completion installed in {path}') @cli.group(short_help="Run one of the available pipelines", no_args_is_help=True, cls=AliasedGroup) @@ -277,7 +287,7 @@ def install(**kwargs): "Only use as a last resort if bugs are encountered")) @prepare_command(parse_config=True) def run(ctx: click.Context, **kwargs): - + """Run subcommand, used for running a pipeline""" # Since the option isnt the same name as `should_use_cpp` anymore, manually set the value here. CppConfig.set_should_use_cpp(kwargs.pop("use_cpp", CppConfig.get_should_use_cpp())) @@ -314,11 +324,9 @@ def pipeline_nlp(ctx: click.Context, **kwargs): output of each stage will become the input for the next stage. For example, to read, classify and write to a file, the following stages could be used - \b pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json - \b Pipelines must follow a few rules: 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` 2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline @@ -368,7 +376,6 @@ def pipeline_nlp(ctx: click.Context, **kwargs): "A label file is a simple text file where each line corresponds to a label. " "If unspecified the value specified by the --label flag will be used.")) @click.option('--columns_file', - default="data/columns_fil.txt", type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True), help=("Specifies a file to read column features.")) @click.option('--viz_file', @@ -382,11 +389,9 @@ def pipeline_fil(ctx: click.Context, **kwargs): output of each stage will become the input for the next stage. For example, to read, classify and write to a file, the following stages could be used - \b pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json - \b Pipelines must follow a few rules: 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` 2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline @@ -413,8 +418,6 @@ def pipeline_fil(ctx: click.Context, **kwargs): if ("columns_file" in kwargs and kwargs["columns_file"] is not None): config.fil.feature_columns = load_labels_file(kwargs["columns_file"]) logger.debug("Loaded columns. Current columns: [%s]", str(config.fil.feature_columns)) - else: - raise ValueError('Unable to find columns file') from morpheus.pipeline import LinearPipeline @@ -430,15 +433,14 @@ def pipeline_fil(ctx: click.Context, **kwargs): pipeline_mode=PipelineModes.AE) @click.option('--columns_file', required=True, - default="data/columns_ae.txt", + default=None, type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True), help=("Specifies a file to read column features.")) @click.option('--labels_file', default=None, type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True), help=("Specifies a file to read labels from in order to convert class IDs into labels. " - "A label file is a simple text file where each line corresponds to a label. " - "If unspecified, only a single output label is created for FIL")) + "A label file is a simple text file where each line corresponds to a label. ")) @click.option('--userid_column_name', type=str, default="userIdentityaccountId", @@ -469,11 +471,9 @@ def pipeline_ae(ctx: click.Context, **kwargs): output of each stage will become the input for the next stage. For example, to read, classify and write to a file, the following stages could be used - \b pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json - \b Pipelines must follow a few rules: 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` 2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline @@ -495,19 +495,14 @@ def pipeline_ae(ctx: click.Context, **kwargs): config.ae.userid_column_name = kwargs["userid_column_name"] config.ae.feature_scaler = kwargs["feature_scaler"] config.ae.use_generic_model = kwargs["use_generic_model"] - - if ("columns_file" in kwargs and kwargs["columns_file"] is not None): - config.ae.feature_columns = load_labels_file(kwargs["columns_file"]) - logger.debug("Loaded columns. Current columns: [%s]", str(config.ae.feature_columns)) - else: - # Use a default single label - config.class_labels = ["reconstruct_loss", "zscore"] + config.ae.feature_columns = load_labels_file(kwargs["columns_file"]) + logger.debug("Loaded columns. Current columns: [%s]", str(config.ae.feature_columns)) if ("labels_file" in kwargs and kwargs["labels_file"] is not None): config.class_labels = load_labels_file(kwargs["labels_file"]) logger.debug("Loaded labels file. Current labels: [%s]", str(config.class_labels)) else: - # Use a default single label + # Use default labels config.class_labels = ["reconstruct_loss", "zscore"] if ("userid_filter" in kwargs): @@ -552,11 +547,9 @@ def pipeline_other(ctx: click.Context, **kwargs): output of each stage will become the input for the next stage. For example, to read, classify and write to a file, the following stages could be used - \b pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json - \b Pipelines must follow a few rules: 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` 2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline @@ -595,6 +588,7 @@ def pipeline_other(ctx: click.Context, **kwargs): @pipeline_other.result_callback() @click.pass_context def post_pipeline(ctx: click.Context, *args, **kwargs): + """Executes the pipeline""" config = get_config_from_ctx(ctx) @@ -611,7 +605,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): # TODO(MDD): Move visualization before `pipeline.run()` once Issue #230 is fixed. if ("viz_file" in kwargs and kwargs["viz_file"] is not None): pipeline.visualize(kwargs["viz_file"], rankdir="LR") - click.secho("Pipeline visualization saved to {}".format(kwargs["viz_file"]), fg="yellow") + click.secho(f"Pipeline visualization saved to {kwargs['viz_file']}", fg="yellow") # Manually create the subcommands for each command (necessary since commands can be used on multiple groups) diff --git a/scripts/validation/val-run-pipeline.sh b/scripts/validation/val-run-pipeline.sh index dc52cbada6..3789a7fc87 100755 --- a/scripts/validation/val-run-pipeline.sh +++ b/scripts/validation/val-run-pipeline.sh @@ -80,7 +80,7 @@ function run_pipeline_abp_nvsmi(){ VAL_OUTPUT=$5 morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ - pipeline-fil \ + pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \ from-file --filename=${INPUT_FILE} \ deserialize \ preprocess \ diff --git a/tests/test_cli.py b/tests/test_cli.py index fa294be082..8e220541aa 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -349,8 +349,9 @@ def test_pipeline_fil(self, config, callback_values): """ Creates a pipeline roughly matching that of the abp validation test """ - args = (GENERAL_ARGS + ['pipeline-fil'] + FILE_SRC_ARGS + ['deserialize', 'preprocess'] + INF_TRITON_ARGS + - MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS) + args = (GENERAL_ARGS + ['pipeline-fil', '--columns_file=data/columns_fil.txt'] + FILE_SRC_ARGS + + ['deserialize', 'preprocess'] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + + ['serialize'] + TO_FILE_ARGS) obj = {} runner = CliRunner() @@ -419,23 +420,24 @@ def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri): with open(labels_file, 'w', encoding='UTF-8') as fh: fh.writelines(['frogs\n', 'lizards\n', 'toads']) - args = (GENERAL_ARGS + ['pipeline-fil', '--labels_file', labels_file] + FILE_SRC_ARGS + FROM_KAFKA_ARGS + [ - 'deserialize', - 'filter', - 'dropna', - '--column', - 'xyz', - 'preprocess', - 'add-scores', - 'unittest-conv-msg', - 'inf-identity', - 'inf-pytorch', - '--model_filename', - tmp_model, - 'mlflow-drift', - '--tracking_uri', - mlflow_uri - ] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS + + args = (GENERAL_ARGS + ['pipeline-fil', '--labels_file', labels_file, '--columns_file=data/columns_fil.txt'] + + FILE_SRC_ARGS + FROM_KAFKA_ARGS + [ + 'deserialize', + 'filter', + 'dropna', + '--column', + 'xyz', + 'preprocess', + 'add-scores', + 'unittest-conv-msg', + 'inf-identity', + 'inf-pytorch', + '--model_filename', + tmp_model, + 'mlflow-drift', + '--tracking_uri', + mlflow_uri + ] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS + TO_KAFKA_ARGS) obj = {}