diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py new file mode 100644 index 0000000000..7a341bc852 --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py @@ -0,0 +1,113 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +import mrc +from dfp.utils.model_cache import ModelCache +from dfp.utils.model_cache import ModelManager +from mlflow.tracking.client import MlflowClient +from mrc.core import operators as ops + +from morpheus.messages.multi_ae_message import MultiAEMessage +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import register_module + +from ..messages.multi_dfp_message import MultiDFPMessage +from ..utils.module_ids import DFP_INFERENCE + +logger = logging.getLogger(__name__) + + +@register_module(DFP_INFERENCE, MODULE_NAMESPACE) +def dfp_inference(builder: mrc.Builder): + """ + Inference module function. + + Parameters + ---------- + builder : mrc.Builder + Pipeline budler instance. + """ + + config = get_module_config(DFP_INFERENCE, builder) + + fallback_user = config.get("fallback_username", None) + model_name_formatter = config.get("model_name_formatter", None) + timestamp_column_name = config.get("timestamp_column_name", None) + + client = MlflowClient() + model_manager = ModelManager(model_name_formatter=model_name_formatter) + + def get_model(user: str) -> ModelCache: + + return model_manager.load_user_model(client, user_id=user, fallback_user_ids=[fallback_user]) + + def on_data(message: MultiDFPMessage): + if (not message or message.mess_count == 0): + return None + + start_time = time.time() + + df_user = message.get_meta() + user_id = message.user_id + + try: + model_cache: ModelCache = get_model(user_id) + + if (model_cache is None): + raise RuntimeError("Could not find model for user {}".format(user_id)) + + loaded_model = model_cache.load_model(client) + + except Exception: # TODO + logger.exception("Error trying to get model") + return None + + post_model_time = time.time() + + results_df = loaded_model.get_results(df_user, return_abs=True) + + # Create an output message to allow setting meta + output_message = MultiAEMessage(message.meta, + mess_offset=message.mess_offset, + mess_count=message.mess_count, + model=loaded_model) + + output_message.set_meta(list(results_df.columns), results_df) + + output_message.set_meta('model_version', f"{model_cache.reg_model_name}:{model_cache.reg_model_version}") + + if logger.isEnabledFor(logging.DEBUG): + load_model_duration = (post_model_time - start_time) * 1000.0 + get_anomaly_duration = (time.time() - post_model_time) * 1000.0 + + logger.debug("Completed inference for user %s. Model load: %s ms, Model infer: %s ms. Start: %s, End: %s", + user_id, + load_model_duration, + get_anomaly_duration, + df_user[timestamp_column_name].min(), + df_user[timestamp_column_name].max()) + + return output_message + + def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): + obs.pipe(ops.map(on_data)).subscribe(sub) + + node = builder.make_node_full(DFP_INFERENCE, node_fn) + + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipeline.py new file mode 100644 index 0000000000..2a8eaa62d7 --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipeline.py @@ -0,0 +1,99 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import dfp.modules.dfp_data_prep # noqa: F401 +import dfp.modules.dfp_inference # noqa: F401 +import dfp.modules.dfp_postprocessing # noqa: F401 +import dfp.modules.dfp_rolling_window # noqa: F401 +import dfp.modules.dfp_split_users # noqa: F401 +import mrc + +import morpheus.modules.file_batcher # noqa: F401 +import morpheus.modules.file_to_df # noqa: F401 +import morpheus.modules.filter_detections # noqa: F401 +import morpheus.modules.serialize # noqa: F401 +import morpheus.modules.write_to_file # noqa: F401 +from morpheus.utils.module_ids import FILE_BATCHER +from morpheus.utils.module_ids import FILE_TO_DF +from morpheus.utils.module_ids import FILTER_DETECTIONS +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_ids import SERIALIZE +from morpheus.utils.module_ids import WRITE_TO_FILE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import load_module +from morpheus.utils.module_utils import register_module + +from ..utils.module_ids import DFP_DATA_PREP +from ..utils.module_ids import DFP_INFERENCE +from ..utils.module_ids import DFP_INFERENCE_PIPELINE +from ..utils.module_ids import DFP_POST_PROCESSING +from ..utils.module_ids import DFP_ROLLING_WINDOW +from ..utils.module_ids import DFP_SPLIT_USERS + +logger = logging.getLogger(__name__) + + +@register_module(DFP_INFERENCE_PIPELINE, MODULE_NAMESPACE) +def dfp_inference_pipeline(builder: mrc.Builder): + """ + This module function allows for the consolidation of multiple dfp pipeline modules relevent to inference + process into a single module. + + Parameters + ---------- + builder : mrc.Builder + Pipeline budler instance. + """ + + config = get_module_config(DFP_INFERENCE_PIPELINE, builder) + + file_batcher_conf = config.get(FILE_BATCHER, None) + file_to_df_conf = config.get(FILE_TO_DF, None) + dfp_split_users_conf = config.get(DFP_SPLIT_USERS, None) + dfp_rolling_window_conf = config.get(DFP_ROLLING_WINDOW, None) + dfp_data_prep_conf = config.get(DFP_DATA_PREP, None) + dfp_inference_conf = config.get(DFP_INFERENCE, None) + filter_detections_conf = config.get(FILTER_DETECTIONS, None) + dfp_post_proc_conf = config.get(DFP_POST_PROCESSING, None) + serialize_conf = config.get(SERIALIZE, None) + write_to_file_conf = config.get(WRITE_TO_FILE, None) + + # Load modules + file_batcher_module = load_module(file_batcher_conf, builder=builder) + file_to_dataframe_module = load_module(file_to_df_conf, builder=builder) + dfp_split_users_modules = load_module(dfp_split_users_conf, builder=builder) + dfp_rolling_window_module = load_module(dfp_rolling_window_conf, builder=builder) + dfp_data_prep_module = load_module(dfp_data_prep_conf, builder=builder) + dfp_inference_module = load_module(dfp_inference_conf, builder=builder) + filter_detections_module = load_module(filter_detections_conf, builder=builder) + dfp_post_proc_module = load_module(dfp_post_proc_conf, builder=builder) + serialize_module = load_module(serialize_conf, builder=builder) + write_to_file_module = load_module(write_to_file_conf, builder=builder) + + # Make an edge between the modules. + builder.make_edge(file_batcher_module.output_port("output"), file_to_dataframe_module.input_port("input")) + builder.make_edge(file_to_dataframe_module.output_port("output"), dfp_split_users_modules.input_port("input")) + builder.make_edge(dfp_split_users_modules.output_port("output"), dfp_rolling_window_module.input_port("input")) + builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) + builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_inference_module.input_port("input")) + builder.make_edge(dfp_inference_module.output_port("output"), filter_detections_module.input_port("input")) + builder.make_edge(filter_detections_module.output_port("output"), dfp_post_proc_module.input_port("input")) + builder.make_edge(dfp_post_proc_module.output_port("output"), serialize_module.input_port("input")) + builder.make_edge(serialize_module.output_port("output"), write_to_file_module.input_port("input")) + + # Register input and output port for a module. + builder.register_module_input("input", file_batcher_module.input_port("input")) + builder.register_module_output("output", write_to_file_module.output_port("output")) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_postprocessing.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_postprocessing.py new file mode 100644 index 0000000000..0793d984ad --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_postprocessing.py @@ -0,0 +1,81 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from datetime import datetime + +import mrc +import numpy as np +from mrc.core import operators as ops + +from morpheus.messages.multi_ae_message import MultiAEMessage +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import register_module + +from ..utils.module_ids import DFP_POST_PROCESSING + +logger = logging.getLogger(__name__) + + +@register_module(DFP_POST_PROCESSING, MODULE_NAMESPACE) +def dfp_postprocessing(builder: mrc.Builder): + """ + Postprocessing module function. + + Parameters + ---------- + builder : mrc.Builder + Pipeline budler instance. + """ + + config = get_module_config(DFP_POST_PROCESSING, builder) + + timestamp_column_name = config.get("timestamp_column_name", None) + + def process_events(message: MultiAEMessage): + # Assume that a filter stage preceedes this stage + df = message.get_meta() + df['event_time'] = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') + df.replace(np.nan, 'NaN', regex=True, inplace=True) + message.set_meta(None, df) + + def on_data(message: MultiAEMessage): + if (not message or message.mess_count == 0): + return None + + start_time = time.time() + + process_events(message) + + duration = (time.time() - start_time) * 1000.0 + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Completed postprocessing for user %s in %s ms. Event count: %s. Start: %s, End: %s", + message.meta.user_id, + duration, + message.mess_count, + message.get_meta(timestamp_column_name).min(), + message.get_meta(timestamp_column_name).max()) + + return message + + def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): + obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub) + + node = builder.make_node_full(DFP_POST_PROCESSING, node_fn) + + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipeline.py new file mode 100644 index 0000000000..949860738a --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipeline.py @@ -0,0 +1,84 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import dfp.modules.dfp_data_prep # noqa: F401 +import dfp.modules.dfp_rolling_window # noqa: F401 +import dfp.modules.dfp_split_users # noqa: F401 +import dfp.modules.dfp_training # noqa: F401 +import mrc + +import morpheus.modules.file_batcher # noqa: F401 +import morpheus.modules.file_to_df # noqa: F401 +import morpheus.modules.mlflow_model_writer # noqa: F401 +from morpheus.utils.module_ids import FILE_BATCHER +from morpheus.utils.module_ids import FILE_TO_DF +from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import load_module +from morpheus.utils.module_utils import register_module + +from ..utils.module_ids import DFP_DATA_PREP +from ..utils.module_ids import DFP_ROLLING_WINDOW +from ..utils.module_ids import DFP_SPLIT_USERS +from ..utils.module_ids import DFP_TRAINING +from ..utils.module_ids import DFP_TRAINING_PIPELINE + +logger = logging.getLogger(__name__) + + +@register_module(DFP_TRAINING_PIPELINE, MODULE_NAMESPACE) +def dfp_training_pipeline(builder: mrc.Builder): + """ + This module function allows for the consolidation of multiple dfp pipeline modules relevent to training + process into a single module. + + Parameters + ---------- + builder : mrc.Builder + Pipeline budler instance. + """ + + config = get_module_config(DFP_TRAINING_PIPELINE, builder) + + file_batcher_conf = config.get(FILE_BATCHER, None) + file_to_df_conf = config.get(FILE_TO_DF, None) + dfp_split_users_conf = config.get(DFP_SPLIT_USERS, None) + dfp_rolling_window_conf = config.get(DFP_ROLLING_WINDOW, None) + dfp_data_prep_conf = config.get(DFP_DATA_PREP, None) + dfp_training_conf = config.get(DFP_TRAINING, None) + mlflow_model_writer_conf = config.get(MLFLOW_MODEL_WRITER, None) + + # Load modules + file_batcher_module = load_module(file_batcher_conf, builder=builder) + file_to_dataframe_module = load_module(file_to_df_conf, builder=builder) + dfp_split_users_modules = load_module(dfp_split_users_conf, builder=builder) + dfp_rolling_window_module = load_module(dfp_rolling_window_conf, builder=builder) + dfp_data_prep_module = load_module(dfp_data_prep_conf, builder=builder) + dfp_training_module = load_module(dfp_training_conf, builder=builder) + mlflow_model_writer_module = load_module(mlflow_model_writer_conf, builder=builder) + + # Make an edge between the modules. + builder.make_edge(file_batcher_module.output_port("output"), file_to_dataframe_module.input_port("input")) + builder.make_edge(file_to_dataframe_module.output_port("output"), dfp_split_users_modules.input_port("input")) + builder.make_edge(dfp_split_users_modules.output_port("output"), dfp_rolling_window_module.input_port("input")) + builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) + builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_training_module.input_port("input")) + builder.make_edge(dfp_training_module.output_port("output"), mlflow_model_writer_module.input_port("input")) + + # Register input and output port for a module. + builder.register_module_input("input", file_batcher_module.input_port("input")) + builder.register_module_output("output", mlflow_model_writer_module.output_port("output")) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py new file mode 100644 index 0000000000..42365cabdc --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py @@ -0,0 +1,282 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.derive_args import pyobj2str +from dfp.utils.module_ids import DFP_DATA_PREP +from dfp.utils.module_ids import DFP_INFERENCE +from dfp.utils.module_ids import DFP_INFERENCE_PIPELINE +from dfp.utils.module_ids import DFP_POST_PROCESSING +from dfp.utils.module_ids import DFP_ROLLING_WINDOW +from dfp.utils.module_ids import DFP_SPLIT_USERS +from dfp.utils.module_ids import DFP_TRAINING +from dfp.utils.module_ids import DFP_TRAINING_PIPELINE +from dfp.utils.regex_utils import iso_date_regex_pattern +from dfp.utils.schema_utils import Schema + +from morpheus.cli.utils import get_package_relative_file +from morpheus.cli.utils import load_labels_file +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.config import CppConfig +from morpheus.messages.multi_message import MultiMessage +from morpheus.utils.module_ids import FILE_BATCHER +from morpheus.utils.module_ids import FILE_TO_DF +from morpheus.utils.module_ids import FILTER_DETECTIONS +from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_ids import SERIALIZE +from morpheus.utils.module_ids import WRITE_TO_FILE + + +class ConfigGenerator: + + def __init__(self, config: Config, derive_args: DeriveArgs, schema: Schema, encoding: str = "latin1"): + + self._config = config + self._derive_args = derive_args + self._encoding = encoding + self._source_schema_str = pyobj2str(schema.source, encoding=encoding) + self._preprocess_schema_str = pyobj2str(schema.preprocess, encoding=encoding) + self._input_message_type = pyobj2str(MultiMessage, encoding) + + def inf_pipe_module_conf(self): + + module_conf = { + "module_id": DFP_INFERENCE_PIPELINE, + "module_name": "dfp_inference_pipeline", + "namespace": MODULE_NAMESPACE, + FILE_BATCHER: { + "module_id": FILE_BATCHER, + "module_name": "file_batcher", + "namespace": MODULE_NAMESPACE, + "period": "D", + "sampling_rate_s": self._derive_args.sample_rate_s, + "start_time": self._derive_args.start_time, + "end_time": self._derive_args.end_time, + "iso_date_regex_pattern": iso_date_regex_pattern + }, + FILE_TO_DF: { + "module_id": FILE_TO_DF, + "module_name": "FILE_TO_DF", + "namespace": MODULE_NAMESPACE, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "parser_kwargs": { + "lines": False, "orient": "records" + }, + "cache_dir": self._derive_args.cache_dir, + "filter_null": True, + "file_type": "JSON", + "schema": { + "schema_str": self._source_schema_str, "encoding": self._encoding + } + }, + DFP_SPLIT_USERS: { + "module_id": DFP_SPLIT_USERS, + "module_name": "dfp_split_users", + "namespace": MODULE_NAMESPACE, + "include_generic": self._derive_args.include_generic, + "include_individual": self._derive_args.include_individual, + "skip_users": self._derive_args.skip_users, + "only_users": self._derive_args.only_users, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "userid_column_name": self._config.ae.userid_column_name, + "fallback_username": self._config.ae.fallback_username + }, + DFP_ROLLING_WINDOW: { + "module_id": DFP_ROLLING_WINDOW, + "module_name": "dfp_rolling_window", + "namespace": MODULE_NAMESPACE, + "min_history": 1, + "min_increment": 0, + "max_history": self._derive_args.duration, + "cache_dir": self._derive_args.cache_dir, + "timestamp_column_name": self._config.ae.timestamp_column_name + }, + DFP_DATA_PREP: { + "module_id": DFP_DATA_PREP, + "module_name": "dfp_data_prep", + "namespace": MODULE_NAMESPACE, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "schema": { + "schema_str": self._preprocess_schema_str, "encoding": self._encoding + } + }, + DFP_INFERENCE: { + "module_id": DFP_INFERENCE, + "module_name": "dfp_inference", + "namespace": MODULE_NAMESPACE, + "model_name_formatter": self._derive_args.model_name_formatter, + "fallback_username": self._config.ae.fallback_username, + "timestamp_column_name": self._config.ae.timestamp_column_name + }, + FILTER_DETECTIONS: { + "module_id": FILTER_DETECTIONS, + "module_name": "filter_detections", + "namespace": MODULE_NAMESPACE, + "field_name": "mean_abs_z", + "threshold": 2.0, + "filter_source": "DATAFRAME", + "schema": { + "input_message_type": self._input_message_type, "encoding": self._encoding + } + }, + DFP_POST_PROCESSING: { + "module_id": DFP_POST_PROCESSING, + "module_name": "dfp_post_processing", + "namespace": MODULE_NAMESPACE, + "timestamp_column_name": self._config.ae.timestamp_column_name + }, + SERIALIZE: { + "module_id": SERIALIZE, + "module_name": "serialize", + "namespace": MODULE_NAMESPACE, + "exclude": ['batch_count', 'origin_hash', '_row_hash', '_batch_id'] + }, + WRITE_TO_FILE: { + "module_id": WRITE_TO_FILE, + "module_name": "write_to_file", + "namespace": MODULE_NAMESPACE, + "filename": "dfp_detections_{}.csv".format(self._derive_args.source), + "overwrite": True + } + } + + return module_conf + + def tra_pipe_module_conf(self): + module_conf = { + "module_id": DFP_TRAINING_PIPELINE, + "module_name": "dfp_training_pipeline", + "namespace": MODULE_NAMESPACE, + FILE_BATCHER: { + "module_id": FILE_BATCHER, + "module_name": "file_batcher", + "namespace": MODULE_NAMESPACE, + "period": "D", + "sampling_rate_s": self._derive_args.sample_rate_s, + "start_time": self._derive_args.start_time, + "end_time": self._derive_args.end_time, + "iso_date_regex_pattern": iso_date_regex_pattern + }, + FILE_TO_DF: { + "module_id": FILE_TO_DF, + "module_name": "FILE_TO_DF", + "namespace": MODULE_NAMESPACE, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "parser_kwargs": { + "lines": False, "orient": "records" + }, + "cache_dir": self._derive_args.cache_dir, + "filter_null": True, + "file_type": "JSON", + "schema": { + "schema_str": self._source_schema_str, "encoding": self._encoding + } + }, + DFP_SPLIT_USERS: { + "module_id": DFP_SPLIT_USERS, + "module_name": "dfp_split_users", + "namespace": MODULE_NAMESPACE, + "include_generic": self._derive_args.include_generic, + "include_individual": self._derive_args.include_individual, + "skip_users": self._derive_args.skip_users, + "only_users": self._derive_args.only_users, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "userid_column_name": self._config.ae.userid_column_name, + "fallback_username": self._config.ae.fallback_username + }, + DFP_ROLLING_WINDOW: { + "module_id": DFP_ROLLING_WINDOW, + "module_name": "dfp_rolling_window", + "namespace": MODULE_NAMESPACE, + "min_history": 300, + "min_increment": 300, + "max_history": self._derive_args.duration, + "cache_dir": self._derive_args.cache_dir, + "timestamp_column_name": self._config.ae.timestamp_column_name + }, + DFP_DATA_PREP: { + "module_id": DFP_DATA_PREP, + "module_name": "dfp_data_prep", + "namespace": MODULE_NAMESPACE, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "schema": { + "schema_str": self._preprocess_schema_str, "encoding": self._encoding + } + }, + DFP_TRAINING: { + "module_id": DFP_TRAINING, + "module_name": "dfp_training", + "namespace": MODULE_NAMESPACE, + "model_kwargs": { + "encoder_layers": [512, 500], # layers of the encoding part + "decoder_layers": [512], # layers of the decoding part + "activation": 'relu', # activation function + "swap_p": 0.2, # noise parameter + "lr": 0.001, # learning rate + "lr_decay": 0.99, # learning decay + "batch_size": 512, + "verbose": False, + "optimizer": 'sgd', # SGD optimizer is selected(Stochastic gradient descent) + "scaler": 'standard', # feature scaling method + "min_cats": 1, # cut off for minority categories + "progress_bar": False, + "device": "cuda" + }, + "feature_columns": self._config.ae.feature_columns, + "epochs": 30, + "validation_size": 0.10 + }, + MLFLOW_MODEL_WRITER: { + "module_id": MLFLOW_MODEL_WRITER, + "module_name": "mlflow_model_writer", + "namespace": MODULE_NAMESPACE, + "model_name_formatter": self._derive_args.model_name_formatter, + "experiment_name_formatter": self._derive_args.experiment_name_formatter, + "timestamp_column_name": self._config.ae.timestamp_column_name, + "conda_env": { + 'channels': ['defaults', 'conda-forge'], + 'dependencies': ['python={}'.format('3.8'), 'pip'], + 'pip': ['mlflow', 'dfencoder'], + 'name': 'mlflow-env' + }, + "databricks_permissions": None + } + } + + return module_conf + + +def generate_ae_config(labels_file: str, + userid_column_name: str, + timestamp_column_name: str, + use_cpp: bool = False, + num_threads: int = os.cpu_count()): + + config = Config() + + CppConfig.set_should_use_cpp(use_cpp) + + config.num_threads = num_threads + + config.ae = ConfigAutoEncoder() + + config.ae.feature_columns = load_labels_file(get_package_relative_file(labels_file)) + config.ae.userid_column_name = userid_column_name + config.ae.timestamp_column_name = timestamp_column_name + + return config diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/derive_args.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/derive_args.py new file mode 100644 index 0000000000..0adc36fc65 --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/derive_args.py @@ -0,0 +1,175 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import pickle +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import mlflow +import pandas as pd + +from morpheus.utils.logger import configure_logging + +logger = logging.getLogger(__name__) + + +class DeriveArgs: + + def __init__(self, + skip_user: str, + only_user: str, + start_time: str, + duration: str, + log_level: str, + cache_dir: str, + sample_rate_s: str, + source: str, + tracking_uri: str, + train_users: str = None): + + self._skip_users = list(skip_user) + self._only_users = list(only_user) + self._start_time = start_time + self._duration = duration + self._log_level = log_level + self._train_users = train_users + self._cache_dir = cache_dir + self._include_generic = None + self._include_individual = None + self._initialized = False + self._tracking_uri = tracking_uri + self._sample_rate_s = sample_rate_s + self._source = source + self._model_name_formatter = "DFP-%s-{user_id}" % (source) + self._experiment_name_formatter = "dfp/%s/training/{reg_model_name}" % (source) + self._is_training = (train_users is not None and train_users != "none") + + def verify_init(func): + + def wrapper(self, *args, **kwargs): + if not self._initialized: + raise Exception('Instance not initialized') + return func(self, *args, **kwargs) + + return wrapper + + def _configure_logging(self): + + configure_logging(log_level=self._log_level) + logging.getLogger("mlflow").setLevel(self._log_level) + + if (len(self._only_users) > 0 and len(self._only_users) > 0): + logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") + + logger.info("Running training pipeline with the following options: ") + logger.info("Train generic_user: %s", self._include_generic) + logger.info("Skipping users: %s", self._skip_users) + logger.info("Start Time: %s", self._start_time) + logger.info("Duration: %s", self._duration) + logger.info("Cache Dir: %s", self._cache_dir) + + @property + @verify_init + def start_time(self): + return self._start_time + + @property + @verify_init + def end_time(self): + return self._end_time + + @property + @verify_init + def include_generic(self): + return self._include_generic + + @property + @verify_init + def include_individual(self): + return self._include_individual + + @property + def duration(self): + return self._duration + + @property + def sample_rate_s(self): + return self._sample_rate_s + + @property + def skip_users(self): + return self._skip_users + + @property + def only_users(self): + return self._only_users + + @property + def cache_dir(self): + return self._cache_dir + + @property + def source(self): + return self._source + + @property + def model_name_formatter(self): + return self._model_name_formatter + + @property + def is_training(self): + return self._is_training + + @property + def experiment_name_formatter(self): + return self._experiment_name_formatter + + def _set_include_generic(self): + self._include_generic = self._train_users == "all" or self._train_users == "generic" + + def _set_include_individual(self): + self._include_individual = self._train_users != "generic" + + def _update_start_stop_time(self): + duration = timedelta(seconds=pd.Timedelta(self._duration).total_seconds()) + if self._start_time is None: + self._end_time = datetime.now(tz=timezone.utc) + self._start_time = self._end_time - duration + else: + if self._start_time.tzinfo is None: + self._start_time = self._start_time.replace(tzinfo=timezone.utc) + + self._end_time = self._start_time + duration + + def _set_mlflow_tracking_uri(self): + if self._tracking_uri is None: + raise ValueError("tracking uri should not be None type.") + # Initialize ML Flow + mlflow.set_tracking_uri(self._tracking_uri) + logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) + + def init(self): + self._update_start_stop_time() + self._set_include_generic() + self._set_include_individual() + self._configure_logging() + self._set_mlflow_tracking_uri() + self._initialized = True + + +def pyobj2str(pyobj, encoding): + str_val = str(pickle.dumps(pyobj), encoding=encoding) + return str_val diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/module_ids.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/module_ids.py index 23ed4268fb..6c7b233f3c 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/module_ids.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/module_ids.py @@ -18,3 +18,7 @@ DFP_SPLIT_USERS = "DFPSplitUsers" DFP_MODEL_TRAIN_DEPLOY = "DFPModelTrainDeploy" DFP_TRAINING = "DFPTraining" +DFP_TRAINING_PIPELINE = "DFPTrainingPipeline" +DFP_INFERENCE_PIPELINE = "DFPInferencePipeline" +DFP_INFERENCE = "DFPInference" +DFP_POST_PROCESSING = "DFPPostProcessing" diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/schema_utils.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/schema_utils.py new file mode 100644 index 0000000000..545c7a5861 --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/schema_utils.py @@ -0,0 +1,147 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +from datetime import datetime +from functools import partial + +from morpheus.config import Config +from morpheus.utils.column_info import BoolColumn +from morpheus.utils.column_info import ColumnInfo +from morpheus.utils.column_info import CustomColumn +from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import DateTimeColumn +from morpheus.utils.column_info import IncrementColumn +from morpheus.utils.column_info import RenameColumn +from morpheus.utils.column_info import StringCatColumn +from morpheus.utils.column_info import create_increment_col + + +@dataclasses.dataclass +class Schema: + source: DataFrameInputSchema + preprocess: DataFrameInputSchema + + +class SchemaBuilder: + + def __init__(self, config: Config): + self._config = config + + def build_azure_schema(self) -> Schema: + # Specify the column names to ensure all data is uniform + source_column_info = [ + DateTimeColumn(name=self._config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + RenameColumn(name=self._config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"), + RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"), + ColumnInfo(name="category", dtype=str), + RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"), + RenameColumn(name="deviceDetailoperatingSystem", + dtype=str, + input_name="properties.deviceDetail.operatingSystem"), + StringCatColumn(name="location", + dtype=str, + input_columns=[ + "properties.location.city", + "properties.location.countryOrRegion", + ], + sep=", "), + RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"), + ] + + source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info) + + preprocess_column_info = [ + ColumnInfo(name=self._config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=self._config.ae.userid_column_name, dtype=str), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + ColumnInfo(name="statusfailureReason", dtype=str), + + # Derived columns + IncrementColumn(name="logcount", + dtype=int, + input_name=self._config.ae.timestamp_column_name, + groupby_column=self._config.ae.userid_column_name), + CustomColumn(name="locincrement", + dtype=int, + process_column_fn=partial(create_increment_col, column_name="location")), + CustomColumn(name="appincrement", + dtype=int, + process_column_fn=partial(create_increment_col, column_name="appDisplayName")), + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + schema = Schema(source=source_schema, preprocess=preprocess_schema) + + return schema + + def build_duo_schema(self) -> Schema: + + # Specify the column names to ensure all data is uniform + source_column_info = [ + DateTimeColumn(name=self._config.ae.timestamp_column_name, dtype=datetime, input_name="timestamp"), + RenameColumn(name=self._config.ae.userid_column_name, dtype=str, input_name="user.name"), + RenameColumn(name="accessdevicebrowser", dtype=str, input_name="access_device.browser"), + RenameColumn(name="accessdeviceos", dtype=str, input_name="access_device.os"), + StringCatColumn(name="location", + dtype=str, + input_columns=[ + "access_device.location.city", + "access_device.location.state", + "access_device.location.country" + ], + sep=", "), + RenameColumn(name="authdevicename", dtype=str, input_name="auth_device.name"), + BoolColumn(name="result", + dtype=bool, + input_name="result", + true_values=["success", "SUCCESS"], + false_values=["denied", "DENIED", "FRAUD"]), + ColumnInfo(name="reason", dtype=str), + ] + + source_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], + column_info=source_column_info) + + # Preprocessing schema + preprocess_column_info = [ + ColumnInfo(name=self._config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=self._config.ae.userid_column_name, dtype=str), + ColumnInfo(name="accessdevicebrowser", dtype=str), + ColumnInfo(name="accessdeviceos", dtype=str), + ColumnInfo(name="authdevicename", dtype=str), + ColumnInfo(name="result", dtype=bool), + ColumnInfo(name="reason", dtype=str), + # Derived columns + IncrementColumn(name="logcount", + dtype=int, + input_name=self._config.ae.timestamp_column_name, + groupby_column=self._config.ae.userid_column_name), + CustomColumn(name="locincrement", + dtype=int, + process_column_fn=partial(create_increment_col, column_name="location")), + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + schema = Schema(source=source_schema, preprocess=preprocess_schema) + + return schema diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_inference.py b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_inference.py new file mode 100644 index 0000000000..90c5d09b4b --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_inference.py @@ -0,0 +1,140 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +from datetime import datetime + +import click +import dfp.modules.dfp_inference_pipeline # noqa: F401 +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import ConfigGenerator +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import parse_log_level +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.general.monitor_stage import MonitorStage + + +@click.command() +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="1d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "See fsspec documentation for list of possible options."), +) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +def run_pipeline(skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + **kwargs): + + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="azure") + + derive_args.init() + + config: Config = generate_ae_config(labels_file="data/columns_ae_azure.txt", + userid_column_name="username", + timestamp_column_name="timestamp") + + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_azure_schema() + + config_generator = ConfigGenerator(config, derive_args, schema) + + module_conf = config_generator.inf_pipe_module_conf() + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"]))) + + # Here we add a wrapped module that implements the DFP Inference pipeline + pipeline.add_stage(LinearModulesStage(config, module_conf, input_port_name="input", output_port_name="output")) + + pipeline.add_stage(MonitorStage(config, description="Inference Pipeline rate", smoothing=0.001)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_pipeline.py old mode 100755 new mode 100644 index fa04a151a2..7855df7eee --- a/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_pipeline.py @@ -13,23 +13,19 @@ # limitations under the License. import logging -import os -import pickle import typing from datetime import datetime -from datetime import timedelta -from datetime import timezone -from functools import partial import click import dfp.modules.dfp_model_train_deploy # noqa: F401 import dfp.modules.dfp_preprocessing # noqa: F401 -import mlflow -import pandas as pd from dfp.messages.multi_dfp_message import MultiDFPMessage from dfp.stages.dfp_inference_stage import DFPInferenceStage from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.derive_args import pyobj2str from dfp.utils.module_ids import DFP_DATA_PREP from dfp.utils.module_ids import DFP_MODEL_TRAIN_DEPLOY from dfp.utils.module_ids import DFP_PREPROCESSING @@ -37,30 +33,19 @@ from dfp.utils.module_ids import DFP_SPLIT_USERS from dfp.utils.module_ids import DFP_TRAINING from dfp.utils.regex_utils import iso_date_regex_pattern +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder from morpheus._lib.common import FilterSource from morpheus.cli.utils import get_log_levels -from morpheus.cli.utils import get_package_relative_file -from morpheus.cli.utils import load_labels_file from morpheus.cli.utils import parse_log_level from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.config import CppConfig from morpheus.pipeline import LinearPipeline from morpheus.stages.general.linear_modules_stage import LinearModulesStage from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.utils.column_info import ColumnInfo -from morpheus.utils.column_info import CustomColumn -from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import DateTimeColumn -from morpheus.utils.column_info import IncrementColumn -from morpheus.utils.column_info import RenameColumn -from morpheus.utils.column_info import StringCatColumn -from morpheus.utils.column_info import create_increment_col -from morpheus.utils.logger import configure_logging from morpheus.utils.module_ids import FILE_BATCHER from morpheus.utils.module_ids import FILE_TO_DF from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER @@ -139,116 +124,32 @@ def run_pipeline(train_users, log_level, sample_rate_s, **kwargs): - # To include the generic, we must be training all or generic - include_generic = train_users == "all" or train_users == "generic" - # To include individual, we must be either training or inferring - include_individual = train_users != "generic" + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="azure", + train_users=train_users) - # None indicates we arent training anything - is_training = train_users != "none" + derive_args.init() - skip_users = list(skip_user) - only_users = list(only_user) + config: Config = generate_ae_config(labels_file="data/columns_ae_azure.txt", + userid_column_name="username", + timestamp_column_name="timestamp") - duration = timedelta(seconds=pd.Timedelta(duration).total_seconds()) - if start_time is None: - end_time = datetime.now(tz=timezone.utc) - start_time = end_time - duration - else: - if start_time.tzinfo is None: - start_time = start_time.replace(tzinfo=timezone.utc) - - end_time = start_time + duration - - # Enable the Morpheus logger - configure_logging(log_level=log_level) - logging.getLogger("mlflow").setLevel(log_level) - - if (len(skip_users) > 0 and len(only_users) > 0): - logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") - - logger = logging.getLogger("morpheus.{}".format(__name__)) - - logger.info("Running training pipeline with the following options: ") - logger.info("Train generic_user: %s", include_generic) - logger.info("Skipping users: %s", skip_users) - logger.info("Start Time: %s", start_time) - logger.info("Duration: %s", duration) - logger.info("Cache Dir: %s", cache_dir) - - if ("tracking_uri" in kwargs): - # Initialize ML Flow - mlflow.set_tracking_uri(kwargs["tracking_uri"]) - logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) - - config = Config() - - CppConfig.set_should_use_cpp(False) - - config.num_threads = os.cpu_count() - - config.ae = ConfigAutoEncoder() - - config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt")) - config.ae.userid_column_name = "username" - config.ae.timestamp_column_name = "timestamp" - - # Specify the column names to ensure all data is uniform - source_column_info = [ - DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), - RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"), - RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"), - ColumnInfo(name="category", dtype=str), - RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"), - RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"), - RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"), - RenameColumn(name="deviceDetailoperatingSystem", - dtype=str, - input_name="properties.deviceDetail.operatingSystem"), - StringCatColumn(name="location", - dtype=str, - input_columns=[ - "properties.location.city", - "properties.location.countryOrRegion", - ], - sep=", "), - RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"), - ] - - source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info) - - # Preprocessing schema - preprocess_column_info = [ - ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), - ColumnInfo(name=config.ae.userid_column_name, dtype=str), - ColumnInfo(name="appDisplayName", dtype=str), - ColumnInfo(name="clientAppUsed", dtype=str), - ColumnInfo(name="deviceDetailbrowser", dtype=str), - ColumnInfo(name="deviceDetaildisplayName", dtype=str), - ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), - ColumnInfo(name="statusfailureReason", dtype=str), - - # Derived columns - IncrementColumn(name="logcount", - dtype=int, - input_name=config.ae.timestamp_column_name, - groupby_column=config.ae.userid_column_name), - CustomColumn(name="locincrement", - dtype=int, - process_column_fn=partial(create_increment_col, column_name="location")), - CustomColumn(name="appincrement", - dtype=int, - process_column_fn=partial(create_increment_col, column_name="appDisplayName")), - ] - - preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_azure_schema() encoding = "latin1" # Convert schema as a string - source_schema_str = str(pickle.dumps(source_schema), encoding=encoding) - preprocess_schema_str = str(pickle.dumps(preprocess_schema), encoding=encoding) + source_schema_str = pyobj2str(schema.source, encoding=encoding) + preprocess_schema_str = pyobj2str(schema.preprocess, encoding=encoding) preprocessing_module_config = { "module_id": DFP_PREPROCESSING, @@ -260,8 +161,8 @@ def run_pipeline(train_users, "namespace": MODULE_NAMESPACE, "period": "D", "sampling_rate_s": sample_rate_s, - "start_time": start_time, - "end_time": end_time, + "start_time": derive_args.start_time, + "end_time": derive_args.end_time, "iso_date_regex_pattern": iso_date_regex_pattern }, FILE_TO_DF: { @@ -269,7 +170,6 @@ def run_pipeline(train_users, "module_name": "FILE_TO_DF", "namespace": MODULE_NAMESPACE, "timestamp_column_name": config.ae.timestamp_column_name, - "userid_column_name": config.ae.userid_column_name, "parser_kwargs": { "lines": False, "orient": "records" }, @@ -284,10 +184,10 @@ def run_pipeline(train_users, "module_id": DFP_SPLIT_USERS, "module_name": "dfp_split_users", "namespace": MODULE_NAMESPACE, - "include_generic": include_generic, - "include_individual": include_individual, - "skip_users": skip_users, - "only_users": only_users, + "include_generic": derive_args.include_generic, + "include_individual": derive_args.include_individual, + "skip_users": derive_args.skip_users, + "only_users": derive_args.only_users, "timestamp_column_name": config.ae.timestamp_column_name, "userid_column_name": config.ae.userid_column_name, "fallback_username": config.ae.fallback_username @@ -296,9 +196,9 @@ def run_pipeline(train_users, "module_id": DFP_ROLLING_WINDOW, "module_name": "dfp_rolling_window", "namespace": MODULE_NAMESPACE, - "min_history": 300 if is_training else 1, - "min_increment": 300 if is_training else 0, - "max_history": "60d" if is_training else "1d", + "min_history": 300 if derive_args.is_training else 1, + "min_increment": 300 if derive_args.is_training else 0, + "max_history": "60d" if derive_args.is_training else "1d", "cache_dir": cache_dir, "timestamp_column_name": config.ae.timestamp_column_name }, @@ -307,7 +207,6 @@ def run_pipeline(train_users, "module_name": "dfp_data_prep", "namespace": MODULE_NAMESPACE, "timestamp_column_name": config.ae.timestamp_column_name, - "userid_column_name": config.ae.userid_column_name, "schema": { "schema_str": preprocess_schema_str, "encoding": encoding } @@ -329,10 +228,7 @@ def run_pipeline(train_users, pipeline.add_stage(MonitorStage(config, description="Preprocessing Module rate", smoothing=0.001)) - model_name_formatter = "DFP-azure-{user_id}" - experiment_name_formatter = "dfp/azure/training/{reg_model_name}" - - if (is_training): + if (derive_args.is_training): # Module configuration training_module_config = { @@ -366,8 +262,8 @@ def run_pipeline(train_users, "module_id": MLFLOW_MODEL_WRITER, "module_name": "mlflow_model_writer", "namespace": MODULE_NAMESPACE, - "model_name_formatter": model_name_formatter, - "experiment_name_formatter": experiment_name_formatter, + "model_name_formatter": derive_args.model_name_formatter, + "experiment_name_formatter": derive_args.experiment_name_formatter, "timestamp_column_name": config.ae.timestamp_column_name, "conda_env": { 'channels': ['defaults', 'conda-forge'], @@ -391,7 +287,7 @@ def run_pipeline(train_users, else: # Perform inference on the preprocessed data - pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter)) + pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=derive_args.model_name_formatter)) pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001)) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_training.py b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_training.py new file mode 100644 index 0000000000..a9189895df --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp_azure_modules_training.py @@ -0,0 +1,150 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +from datetime import datetime + +import click +import dfp.modules.dfp_training_pipeline # noqa: F401 +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import ConfigGenerator +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import parse_log_level +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.general.monitor_stage import MonitorStage + + +@click.command() +@click.option( + "--train_users", + type=click.Choice(["all", "generic", "individual"], case_sensitive=False), + default="generic", + show_default=True, + help=("Indicates whether or not to train per user or a generic model for all users. " + "Selecting none runs the inference pipeline."), +) +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="60d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "See fsspec documentation for list of possible options."), +) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +def run_pipeline(train_users, + skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + **kwargs): + + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="azure", + train_users=train_users) + + derive_args.init() + + config: Config = generate_ae_config(labels_file="data/columns_ae_azure.txt", + userid_column_name="username", + timestamp_column_name="timestamp") + + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_azure_schema() + + config_generator = ConfigGenerator(config, derive_args, schema) + + module_conf = config_generator.tra_pipe_module_conf() + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"]))) + + # Here we add a wrapped module that implements the full DFP Training pipeline + pipeline.add_stage(LinearModulesStage(config, module_conf, input_port_name="input", output_port_name="output")) + + pipeline.add_stage(MonitorStage(config, description="Training Pipeline rate", smoothing=0.001)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_inference.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_inference.py new file mode 100644 index 0000000000..7c44fd7afd --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_inference.py @@ -0,0 +1,141 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +from datetime import datetime + +import click +import dfp.modules.dfp_inference_pipeline # noqa: F401 +import dfp.modules.dfp_postprocessing # noqa: F401 +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import ConfigGenerator +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import parse_log_level +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.general.monitor_stage import MonitorStage + + +@click.command() +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="1d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "See fsspec documentation for list of possible options."), +) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +def run_pipeline(skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + **kwargs): + + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="duo") + + derive_args.init() + + config: Config = generate_ae_config(labels_file="data/columns_ae_duo.txt", + userid_column_name="username", + timestamp_column_name="timestamp") + + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_duo_schema() + + config_generator = ConfigGenerator(config, derive_args, schema) + + module_conf = config_generator.inf_pipe_module_conf() + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"]))) + + # Here we add a wrapped module that implements the DFP Inference pipeline + pipeline.add_stage(LinearModulesStage(config, module_conf, input_port_name="input", output_port_name="output")) + + pipeline.add_stage(MonitorStage(config, description="Inference Pipeline rate", smoothing=0.001)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_pipeline.py index 0ed7d0e0bd..3ef9489907 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_pipeline.py @@ -13,23 +13,19 @@ # limitations under the License. import logging -import os -import pickle import typing from datetime import datetime -from datetime import timedelta -from datetime import timezone -from functools import partial import click import dfp.modules.dfp_model_train_deploy # noqa: F401 import dfp.modules.dfp_preprocessing # noqa: F401 -import mlflow -import pandas as pd from dfp.messages.multi_dfp_message import MultiDFPMessage from dfp.stages.dfp_inference_stage import DFPInferenceStage from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.derive_args import pyobj2str from dfp.utils.module_ids import DFP_DATA_PREP from dfp.utils.module_ids import DFP_MODEL_TRAIN_DEPLOY from dfp.utils.module_ids import DFP_PREPROCESSING @@ -37,31 +33,19 @@ from dfp.utils.module_ids import DFP_SPLIT_USERS from dfp.utils.module_ids import DFP_TRAINING from dfp.utils.regex_utils import iso_date_regex_pattern +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder from morpheus._lib.common import FilterSource from morpheus.cli.utils import get_log_levels -from morpheus.cli.utils import get_package_relative_file -from morpheus.cli.utils import load_labels_file from morpheus.cli.utils import parse_log_level from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.config import CppConfig from morpheus.pipeline import LinearPipeline from morpheus.stages.general.linear_modules_stage import LinearModulesStage from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.utils.column_info import BoolColumn -from morpheus.utils.column_info import ColumnInfo -from morpheus.utils.column_info import CustomColumn -from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import DateTimeColumn -from morpheus.utils.column_info import IncrementColumn -from morpheus.utils.column_info import RenameColumn -from morpheus.utils.column_info import StringCatColumn -from morpheus.utils.column_info import create_increment_col -from morpheus.utils.logger import configure_logging from morpheus.utils.module_ids import FILE_BATCHER from morpheus.utils.module_ids import FILE_TO_DF from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER @@ -140,113 +124,32 @@ def run_pipeline(train_users, log_level, sample_rate_s, **kwargs): - # To include the generic, we must be training all or generic - include_generic = train_users == "all" or train_users == "generic" - # To include individual, we must be either training or inferring - include_individual = train_users != "generic" + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="duo", + train_users=train_users) - # None indicates we arent training anything - is_training = train_users != "none" + derive_args.init() - skip_users = list(skip_user) - only_users = list(only_user) + config: Config = generate_ae_config(labels_file="data/columns_ae_duo.txt", + userid_column_name="username", + timestamp_column_name="timestamp") - duration = timedelta(seconds=pd.Timedelta(duration).total_seconds()) - if start_time is None: - end_time = datetime.now(tz=timezone.utc) - start_time = end_time - duration - else: - if start_time.tzinfo is None: - start_time = start_time.replace(tzinfo=timezone.utc) - - end_time = start_time + duration - - # Enable the Morpheus logger - configure_logging(log_level=log_level) - logging.getLogger("mlflow").setLevel(log_level) - - if (len(skip_users) > 0 and len(only_users) > 0): - logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") - - logger = logging.getLogger("morpheus.{}".format(__name__)) - - logger.info("Running training pipeline with the following options: ") - logger.info("Train generic_user: %s", include_generic) - logger.info("Skipping users: %s", skip_users) - logger.info("Start Time: %s", start_time) - logger.info("Duration: %s", duration) - logger.info("Cache Dir: %s", cache_dir) - - if ("tracking_uri" in kwargs): - # Initialize ML Flow - mlflow.set_tracking_uri(kwargs["tracking_uri"]) - logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) - - config = Config() - - CppConfig.set_should_use_cpp(False) - - config.num_threads = os.cpu_count() - - config.ae = ConfigAutoEncoder() - - config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_duo.txt")) - - config.ae.userid_column_name = "username" - config.ae.timestamp_column_name = "timestamp" - - source_column_info = [ - DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="timestamp"), - RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="user.name"), - RenameColumn(name="accessdevicebrowser", dtype=str, input_name="access_device.browser"), - RenameColumn(name="accessdeviceos", dtype=str, input_name="access_device.os"), - StringCatColumn(name="location", - dtype=str, - input_columns=[ - "access_device.location.city", - "access_device.location.state", - "access_device.location.country" - ], - sep=", "), - RenameColumn(name="authdevicename", dtype=str, input_name="auth_device.name"), - BoolColumn(name="result", - dtype=bool, - input_name="result", - true_values=["success", "SUCCESS"], - false_values=["denied", "DENIED", "FRAUD"]), - ColumnInfo(name="reason", dtype=str), - ] - - source_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info) - - # Preprocessing schema - preprocess_column_info = [ - ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), - ColumnInfo(name=config.ae.userid_column_name, dtype=str), - ColumnInfo(name="accessdevicebrowser", dtype=str), - ColumnInfo(name="accessdeviceos", dtype=str), - ColumnInfo(name="authdevicename", dtype=str), - ColumnInfo(name="result", dtype=bool), - ColumnInfo(name="reason", dtype=str), - # Derived columns - IncrementColumn(name="logcount", - dtype=int, - input_name=config.ae.timestamp_column_name, - groupby_column=config.ae.userid_column_name), - CustomColumn(name="locincrement", - dtype=int, - process_column_fn=partial(create_increment_col, column_name="location")), - ] - - preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_duo_schema() encoding = "latin1" # Convert schema as a string - source_schema_str = str(pickle.dumps(source_schema), encoding=encoding) - preprocess_schema_str = str(pickle.dumps(preprocess_schema), encoding=encoding) + source_schema_str = pyobj2str(schema.source, encoding=encoding) + preprocess_schema_str = pyobj2str(schema.preprocess, encoding=encoding) preprocessing_module_config = { "module_id": DFP_PREPROCESSING, @@ -258,8 +161,8 @@ def run_pipeline(train_users, "namespace": MODULE_NAMESPACE, "period": "D", "sampling_rate_s": sample_rate_s, - "start_time": start_time, - "end_time": end_time, + "start_time": derive_args.start_time, + "end_time": derive_args.end_time, "iso_date_regex_pattern": iso_date_regex_pattern }, FILE_TO_DF: { @@ -267,7 +170,6 @@ def run_pipeline(train_users, "module_name": "FILE_TO_DF", "namespace": MODULE_NAMESPACE, "timestamp_column_name": config.ae.timestamp_column_name, - "userid_column_name": config.ae.userid_column_name, "parser_kwargs": { "lines": False, "orient": "records" }, @@ -282,10 +184,10 @@ def run_pipeline(train_users, "module_id": DFP_SPLIT_USERS, "module_name": "dfp_fsplit_users", "namespace": MODULE_NAMESPACE, - "include_generic": include_generic, - "include_individual": include_individual, - "skip_users": skip_users, - "only_users": only_users, + "include_generic": derive_args.include_generic, + "include_individual": derive_args.include_individual, + "skip_users": derive_args.skip_users, + "only_users": derive_args.only_users, "timestamp_column_name": config.ae.timestamp_column_name, "userid_column_name": config.ae.userid_column_name, "fallback_username": config.ae.fallback_username @@ -294,9 +196,9 @@ def run_pipeline(train_users, "module_id": DFP_ROLLING_WINDOW, "module_name": "dfp_rolling_window", "namespace": MODULE_NAMESPACE, - "min_history": 300 if is_training else 1, - "min_increment": 300 if is_training else 0, - "max_history": "60d" if is_training else "1d", + "min_history": 300 if derive_args.is_training else 1, + "min_increment": 300 if derive_args.is_training else 0, + "max_history": "60d" if derive_args.is_training else "1d", "cache_dir": cache_dir, "timestamp_column_name": config.ae.timestamp_column_name }, @@ -305,7 +207,6 @@ def run_pipeline(train_users, "module_name": "dfp_data_prep", "namespace": MODULE_NAMESPACE, "timestamp_column_name": config.ae.timestamp_column_name, - "userid_column_name": config.ae.userid_column_name, "schema": { "schema_str": preprocess_schema_str, "encoding": encoding } @@ -327,10 +228,7 @@ def run_pipeline(train_users, pipeline.add_stage(MonitorStage(config, description="Preprocessing Module rate", smoothing=0.001)) - model_name_formatter = "DFP-duo-{user_id}" - experiment_name_formatter = "dfp/duo/training/{reg_model_name}" - - if (is_training): + if (derive_args.is_training): # Module configuration training_module_config = { @@ -364,8 +262,8 @@ def run_pipeline(train_users, "module_id": MLFLOW_MODEL_WRITER, "module_name": "mlflow_model_writer", "namespace": MODULE_NAMESPACE, - "model_name_formatter": model_name_formatter, - "experiment_name_formatter": experiment_name_formatter, + "model_name_formatter": derive_args.model_name_formatter, + "experiment_name_formatter": derive_args.experiment_name_formatter, "timestamp_column_name": config.ae.timestamp_column_name, "conda_env": { 'channels': ['defaults', 'conda-forge'], @@ -388,7 +286,7 @@ def run_pipeline(train_users, pipeline.add_stage(MonitorStage(config, description="Training Module rate", smoothing=0.001)) else: - pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter)) + pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=derive_args.model_name_formatter)) pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001)) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_training.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_training.py new file mode 100644 index 0000000000..d515b6353a --- /dev/null +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_modules_training.py @@ -0,0 +1,150 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +from datetime import datetime + +import click +import dfp.modules.dfp_training_pipeline # noqa: F401 +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.config_generator import ConfigGenerator +from dfp.utils.config_generator import generate_ae_config +from dfp.utils.derive_args import DeriveArgs +from dfp.utils.schema_utils import Schema +from dfp.utils.schema_utils import SchemaBuilder + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import parse_log_level +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.general.monitor_stage import MonitorStage + + +@click.command() +@click.option( + "--train_users", + type=click.Choice(["all", "generic", "individual"], case_sensitive=False), + default="generic", + show_default=True, + help=("Indicates whether or not to train per user or a generic model for all users. " + "Selecting none runs the inference pipeline."), +) +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="60d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "See fsspec documentation for list of possible options."), +) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +def run_pipeline(train_users, + skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + **kwargs): + + derive_args = DeriveArgs(skip_user, + only_user, + start_time, + duration, + log_level, + cache_dir, + sample_rate_s, + tracking_uri=kwargs["tracking_uri"], + source="duo", + train_users=train_users) + + derive_args.init() + + config: Config = generate_ae_config(labels_file="data/columns_ae_duo.txt", + userid_column_name="username", + timestamp_column_name="timestamp") + + schema_builder = SchemaBuilder(config) + schema: Schema = schema_builder.build_duo_schema() + + config_generator = ConfigGenerator(config, derive_args, schema) + + module_conf = config_generator.tra_pipe_module_conf() + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"]))) + + # Here we add a wrapped module that implements the full DFP Training pipeline + pipeline.add_stage(LinearModulesStage(config, module_conf, input_port_name="input", output_port_name="output")) + + pipeline.add_stage(MonitorStage(config, description="Training Pipeline rate", smoothing=0.001)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/morpheus/modules/file_to_df.py b/morpheus/modules/file_to_df.py index a714c452a0..056e72d409 100644 --- a/morpheus/modules/file_to_df.py +++ b/morpheus/modules/file_to_df.py @@ -56,6 +56,7 @@ def file_to_df(builder: mrc.Builder): config = get_module_config(FILE_TO_DF, builder) + timestamp_column_name = config.get("timestamp_column_name", None) schema_config = config.get("schema", None) schema_str = schema_config.get("schema_str", None) encoding = schema_config.get("encoding", None) @@ -211,7 +212,7 @@ def get_or_create_dataframe_from_s3_batch( output_df: pd.DataFrame = pd.concat(dfs) # Finally sort by timestamp and then reset the index - output_df.sort_values(by=["timestamp"], inplace=True) + output_df.sort_values(by=[timestamp_column_name], inplace=True) output_df.reset_index(drop=True, inplace=True) diff --git a/morpheus/modules/filter_detections.py b/morpheus/modules/filter_detections.py new file mode 100644 index 0000000000..a58cd442c8 --- /dev/null +++ b/morpheus/modules/filter_detections.py @@ -0,0 +1,179 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import pickle +import typing + +import cupy as cp +import mrc +import numpy as np +import typing_utils +from mrc.core import operators as ops + +from morpheus._lib.common import FilterSource +from morpheus.messages import MultiMessage +from morpheus.messages.multi_response_message import MultiResponseMessage +from morpheus.utils.module_ids import FILTER_DETECTIONS +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import register_module + +logger = logging.getLogger(__name__) + + +@register_module(FILTER_DETECTIONS, MODULE_NAMESPACE) +def filter_detections(builder: mrc.Builder): + """ + Filter message by a classification threshold. + + The FilterDetections is used to filter rows from a dataframe based on values in a tensor using a specified + criteria. Rows in the `meta` dataframe are excluded if their associated value in the `probs` array is less than or + equal to `threshold`. + + This module can operate in two different modes set by the `copy` argument. + When the `copy` argument is `True` (default), rows that meet the filter criteria are copied into a new dataframe. + When `False` sliced views are used instead. + + Setting `copy=True` should be used when the number of matching records is expected to be both high and in + non-adjacent rows. In this mode, the stage will generate only one output message for each incoming message, + regardless of the size of the input and the number of matching records. However this comes at the cost of needing to + allocate additional memory and perform the copy. + Note: In most other stages, messages emitted contain a reference to the original `MessageMeta` emitted into the + pipeline by the source stage. When using copy mode this won't be the case and could cause the original `MessageMeta` + to be deallocated after this stage. + + Setting `copy=False` should be used when either the number of matching records is expected to be very low or are + likely to be contained in adjacent rows. In this mode, slices of contiguous blocks of rows are emitted in multiple + output messages. Performing a slice is relatively low-cost, however for each incoming message the number of emitted + messages could be high (in the worst case scenario as high as half the number of records in the incoming message). + Depending on the downstream stages, this can cause performance issues, especially if those stages need to acquire + the Python GIL. + + Parameters + ---------- + builder : mrc.Builder + mrc Builder object. + """ + + config = get_module_config(FILTER_DETECTIONS, builder) + + field_name = config.get("field_name", "probs") + threshold = config.get("threshold", 0.5) + filter_source = config.get("filter_source", "AUTO") + copy = config.get("copy", True) + + schema_config = config.get("schema", None) + input_message_type = schema_config.get("input_message_type", None) + encoding = schema_config.get("encoding", None) + + message_type = pickle.loads(bytes(input_message_type, encoding)) + + def find_detections(x: MultiMessage, filter_source) -> typing.Union[cp.ndarray, np.ndarray]: + + # Determind the filter source + if filter_source == FilterSource.TENSOR: + filter_source = x.get_output(field_name) + else: + filter_source = x.get_meta(field_name).values + + if (isinstance(filter_source, np.ndarray)): + array_mod = np + else: + array_mod = cp + + # Get per row detections + detections = (filter_source > threshold) + + if (len(detections.shape) > 1): + detections = detections.any(axis=1) + + # Surround in False to ensure we get an even number of pairs + detections = array_mod.concatenate([array_mod.array([False]), detections, array_mod.array([False])]) + + return array_mod.where(detections[1:] != detections[:-1])[0].reshape((-1, 2)) + + def filter_copy(x: MultiMessage) -> MultiMessage: + """ + This function uses a threshold value to filter the messages. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + Response message with probabilities calculated from inference results. + + Returns + ------- + `morpheus.pipeline.messages.MultiMessage` + A new message containing a copy of the rows above the threshold. + + """ + if x is None: + return None + + true_pairs = find_detections(x, filter_source) + + return x.copy_ranges(true_pairs) + + def filter_slice(x: MultiMessage) -> typing.List[MultiMessage]: + """ + This function uses a threshold value to filter the messages. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + Response message with probabilities calculated from inference results. + + Returns + ------- + typing.List[`morpheus.pipeline.messages.MultiMessage`] + List of filtered messages. + + """ + # Unfortunately we have to convert this to a list in case there are non-contiguous groups + output_list = [] + if x is not None: + true_pairs = find_detections(x, filter_source) + for pair in true_pairs: + pair = tuple(pair.tolist()) + if ((pair[1] - pair[0]) > 0): + output_list.append(x.get_slice(*pair)) + + return output_list + + if filter_source == "AUTO": + if (typing_utils.issubtype(message_type, MultiResponseMessage)): + filter_source = FilterSource.TENSOR + else: + filter_source = FilterSource.DATAFRAME + + logger.debug(f"filter_source was set to Auto, infering a filter source of {filter_source} based on an input " + "message type of {message_type}") + elif filter_source == "DATAFRAME": + filter_source = FilterSource.DATAFRAME + else: + raise Exception("Unknown filter source: {}".format(filter_source)) + + if copy: + node = builder.make_node(FILTER_DETECTIONS, filter_copy) + else: + # Convert list back to individual messages + def flatten_fn(obs: mrc.Observable, sub: mrc.Subscriber): + obs.pipe(ops.map(filter_slice), ops.flatten()).subscribe(sub) + + node = builder.make_node_full(FILTER_DETECTIONS, flatten_fn) + + # Register input and output port for a module. + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/morpheus/modules/serialize.py b/morpheus/modules/serialize.py new file mode 100644 index 0000000000..a212842954 --- /dev/null +++ b/morpheus/modules/serialize.py @@ -0,0 +1,105 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import typing +from functools import partial + +import mrc + +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import MessageMeta +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_ids import SERIALIZE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import register_module + +logger = logging.getLogger(__name__) + + +@register_module(SERIALIZE, MODULE_NAMESPACE) +def serialize(builder: mrc.Builder): + """ + Includes & excludes columns from messages. + + This module filters columns from a `MultiMessage` object emitting a `MessageMeta`. + + Parameters + ---------- + builder : mrc.Builder + mrc Builder object. + """ + + config = get_module_config(SERIALIZE, builder) + include_columns = config.get("include", None) + exclude_columns = config.get("exclude", [r'^ID$', r'^_ts_']) + fixed_columns = config.get("fixed_columns", True) + columns = config.get("columns", None) + + def convert_to_df(x: MultiMessage, + include_columns: typing.Pattern, + exclude_columns: typing.List[typing.Pattern], + columns: typing.List[str]): + """ + Converts dataframe to entries to JSON lines. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + MultiMessage instance that contains data. + include_columns : typing.Pattern + Columns that are required send to downstream stage. + exclude_columns : typing.List[typing.Pattern] + Columns that are not required send to downstream stage. + + """ + + if fixed_columns and columns is not None: + columns = columns + else: + columns: typing.List[str] = [] + + # Minimize access to x.meta.df + df_columns = list(x.meta.df.columns) + + # First build up list of included. If no include regex is specified, select all + if (include_columns is None): + columns = df_columns + else: + columns = [y for y in df_columns if include_columns.match(y)] + + # Now remove by the ignore + for test in exclude_columns: + columns = [y for y in columns if not test.match(y)] + + columns = columns + + # Get metadata from columns + df = x.get_meta(columns) + + return MessageMeta(df=df) + + if (include_columns is not None and len(include_columns) > 0): + include_columns = re.compile("({})".format("|".join(include_columns))) + + exclude_columns = [re.compile(x) for x in exclude_columns] + + node = builder.make_node( + SERIALIZE, + partial(convert_to_df, include_columns=include_columns, exclude_columns=exclude_columns, columns=columns)) + + # Register input and output port for a module. + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/morpheus/modules/write_to_file.py b/morpheus/modules/write_to_file.py new file mode 100644 index 0000000000..5f3fb77ab1 --- /dev/null +++ b/morpheus/modules/write_to_file.py @@ -0,0 +1,118 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import typing + +import mrc +import pandas as pd +from mrc.core import operators as ops + +import cudf + +from morpheus._lib.common import FileTypes +from morpheus._lib.common import determine_file_type +from morpheus.io import serializers +from morpheus.messages.message_meta import MessageMeta +from morpheus.utils.module_ids import MODULE_NAMESPACE +from morpheus.utils.module_ids import WRITE_TO_FILE +from morpheus.utils.module_utils import get_module_config +from morpheus.utils.module_utils import register_module + +logger = logging.getLogger(__name__) + +is_first = True + + +@register_module(WRITE_TO_FILE, MODULE_NAMESPACE) +def write_to_file(builder: mrc.Builder): + """ + Write all messages to a file. + + This module writes messages to a file. + + Parameters + ---------- + builder : mrc.Builder + mrc Builder object. + """ + + config = get_module_config(WRITE_TO_FILE, builder) + + output_file = config.get("filename", None) + overwrite = config.get("overwrite", False) + flush = config.get("flush", False) + file_type = config.get("file_type", FileTypes.Auto) + include_index_col = config.get("include_index_col", True) + + if (os.path.exists(output_file)): + if (overwrite): + os.remove(output_file) + else: + raise FileExistsError( + "Cannot output classifications to '{}'. File exists and overwrite = False".format(output_file)) + + if (file_type == FileTypes.Auto): + file_type = determine_file_type(output_file) + + def convert_to_strings(df: typing.Union[pd.DataFrame, cudf.DataFrame]): + + global is_first + + if (file_type == FileTypes.JSON): + output_strs = serializers.df_to_json(df, include_index_col=include_index_col) + elif (file_type == FileTypes.CSV): + output_strs = serializers.df_to_csv(df, include_header=is_first, include_index_col=include_index_col) + else: + raise NotImplementedError("Unknown file type: {}".format(file_type)) + + is_first = False + + # Remove any trailing whitespace + if (len(output_strs[-1].strip()) == 0): + output_strs = output_strs[:-1] + + return output_strs + + # Sink to file + + def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): + + # Ensure our directory exists + os.makedirs(os.path.realpath(os.path.dirname(output_file)), exist_ok=True) + + # Open up the file handle + with open(output_file, "a") as out_file: + + def write_to_file(x: MessageMeta): + + lines = convert_to_strings(x.df) + + out_file.writelines(lines) + + if flush: + out_file.flush() + + return x + + obs.pipe(ops.map(write_to_file)).subscribe(sub) + + # File should be closed by here + + node = builder.make_node_full(WRITE_TO_FILE, node_fn) + + # Register input and output port for a module. + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/morpheus/utils/module_ids.py b/morpheus/utils/module_ids.py index c174ceff9c..99bd17f89a 100644 --- a/morpheus/utils/module_ids.py +++ b/morpheus/utils/module_ids.py @@ -17,3 +17,6 @@ FILE_BATCHER = "FileBatcher" FILE_TO_DF = "FileToDF" MLFLOW_MODEL_WRITER = "MLFlowModelWriter" +SERIALIZE = "Serialize" +WRITE_TO_FILE = "WriteToFile" +FILTER_DETECTIONS = "filter_detections"