diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py index e584a27179..ea61341b3a 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py @@ -204,7 +204,7 @@ def _get_or_create_dataframe_from_s3_batch( output_df: pd.DataFrame = pd.concat(dfs) # Finally sort by timestamp and then reset the index - output_df.sort_values(by=["timestamp"], inplace=True) + output_df.sort_values(by=[self._config.ae.timestamp_column_name], inplace=True) output_df.reset_index(drop=True, inplace=True) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py index 367f32d33e..a947a75623 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py @@ -164,8 +164,8 @@ def on_data(self, message: MultiAEMessage): "Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"), "Learning rate": model.lr, "Batch size": model.batch_size, - "Start Epoch": message.get_meta("timestamp").min(), - "End Epoch": message.get_meta("timestamp").max(), + "Start Epoch": message.get_meta(self._config.ae.timestamp_column_name).min(), + "End Epoch": message.get_meta(self._config.ae.timestamp_column_name).max(), "Log Count": message.mess_count, }) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_split_users_stage.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_split_users_stage.py index ab9eee3f4e..3732d53ddd 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_split_users_stage.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_split_users_stage.py @@ -84,9 +84,11 @@ def extract_users(self, message: cudf.DataFrame): if (self._include_individual): - split_dataframes.update( - {username: user_df - for username, user_df in message.groupby("username", sort=False)}) + split_dataframes.update({ + username: user_df + for username, + user_df in message.groupby(self._config.ae.userid_column_name, sort=False) + }) output_messages: typing.List[DFPMessageMeta] = [] diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/cached_user_window.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/cached_user_window.py index 8629c8bd05..7e27b1bc6c 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/cached_user_window.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/cached_user_window.py @@ -44,11 +44,11 @@ class CachedUserWindow: def append_dataframe(self, incoming_df: pd.DataFrame) -> bool: # Filter the incoming df by epochs later than the current max_epoch - filtered_df = incoming_df[incoming_df["timestamp"] > self.max_epoch] + filtered_df = incoming_df[incoming_df[self.timestamp_column] > self.max_epoch] if (len(filtered_df) == 0): # We have nothing new to add. Double check that we fit within the window - before_history = incoming_df[incoming_df["timestamp"] < self.min_epoch] + before_history = incoming_df[incoming_df[self.timestamp_column] < self.min_epoch] return len(before_history) == 0 @@ -59,7 +59,7 @@ def append_dataframe(self, incoming_df: pd.DataFrame) -> bool: # Set the filtered index filtered_df.index = range(self.total_count, self.total_count + len(filtered_df)) - # Save the row hash to make it easier to find later. Do this before the batch so it doesnt participate + # Save the row hash to make it easier to find later. Do this before the batch so it doesn't participate filtered_df["_row_hash"] = pd.util.hash_pandas_object(filtered_df, index=False) # Use batch id to distinguish groups in the same dataframe diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py index ab725348ad..52effdf20e 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py @@ -231,10 +231,16 @@ def run_pipeline(train_users, groupby_column=config.ae.userid_column_name), CustomColumn(name="locincrement", dtype=int, - process_column_fn=partial(create_increment_col, column_name="location")), + process_column_fn=partial(create_increment_col, + column_name="location", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name)), CustomColumn(name="appincrement", dtype=int, - process_column_fn=partial(create_increment_col, column_name="appDisplayName")), + process_column_fn=partial(create_increment_col, + column_name="appDisplayName", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name)) ] preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py index 40631a4468..4ff81f9d53 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py @@ -232,7 +232,10 @@ def run_pipeline(train_users, groupby_column=config.ae.userid_column_name), CustomColumn(name="locincrement", dtype=int, - process_column_fn=partial(create_increment_col, column_name="location")), + process_column_fn=partial(create_increment_col, + column_name="location", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name)) ] preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"])