Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate redundant code blocks in modules and stages #1123

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4a21359
Setting the tag back to v22.11.00
mdemoret-nv Jan 24, 2023
e959048
removed duplicate code in modules and stages
bsuryadevara Aug 8, 2023
c1beefe
pylint correction
bsuryadevara Aug 9, 2023
5610fc4
updated serilalizer module
bsuryadevara Aug 9, 2023
b6fb2a0
yapf format correction
bsuryadevara Aug 9, 2023
2d62ab7
yapf format correction
bsuryadevara Aug 9, 2023
5765460
fix to preserve_columns property
bsuryadevara Aug 10, 2023
2cccb91
added additional check to schema_transforms
bsuryadevara Aug 16, 2023
cacfed1
added checks to handle str type filter_source
bsuryadevara Aug 17, 2023
7c5e173
updated tests
bsuryadevara Aug 17, 2023
8edd637
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 17, 2023
e39bf53
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 23, 2023
a802734
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 23, 2023
89045cb
updated tests
bsuryadevara Aug 23, 2023
af50204
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 24, 2023
8c67be8
fixed pylint warnings
bsuryadevara Aug 25, 2023
623f4b9
updated to align with latest changes
bsuryadevara Aug 25, 2023
ea3ecee
Merge remote-tracking branch 'upstream/branch-22.11' into remove-dupl…
bsuryadevara Aug 31, 2023
d8deaae
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 31, 2023
a7499d6
moved monitor controller to controllers module
bsuryadevara Aug 31, 2023
4b43315
removed unused pylint disable comment
bsuryadevara Sep 5, 2023
0d29557
Merge branch 'branch-23.11' into remove-duplicate-code
bsuryadevara Sep 5, 2023
7546da8
Update examples/ransomware_detection/common/feature_extractor.py
bsuryadevara Sep 5, 2023
2402127
minor fixes and updates to model cache
bsuryadevara Sep 6, 2023
f201aca
Merge branch 'remove-duplicate-code' of github.com:bsuryadevara/Morph…
bsuryadevara Sep 6, 2023
aaf5fc1
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
from dfp.modules import dfp_monitor
from dfp.modules import dfp_split_users
from dfp.modules import dfp_data_prep
from dfp.modules import dfp_deployment
from dfp.modules import dfp_inference
from dfp.modules import dfp_inference_pipe
from dfp.modules import dfp_monitor
from dfp.modules import dfp_postprocessing
from dfp.modules import dfp_preproc
from dfp.modules import dfp_rolling_window
from dfp.modules import dfp_split_users
from dfp.modules import dfp_training
from dfp.modules import dfp_inference_pipe
from dfp.modules import dfp_training_pipe
from dfp.modules import dfp_deployment

__all__ = [
"dfp_monitor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import mrc
from mrc.core import operators as ops
from sklearn.model_selection import train_test_split

import cudf

Expand Down Expand Up @@ -87,8 +88,16 @@ def on_data(control_message: ControlMessage):
# Only train on the feature columns
train_df = final_df[final_df.columns.intersection(feature_columns)]

validation_df = None
run_validation = False

# Split into training and validation sets
if validation_size > 0.0:
train_df, validation_df = train_test_split(train_df, test_size=validation_size, shuffle=False)
run_validation = True

logger.debug("Training AE model for user: '%s'...", user_id)
model.fit(train_df, epochs=epochs)
model.fit(train_df, epochs=epochs, val_data=validation_df, run_validation=run_validation)
logger.debug("Training AE model for user: '%s'... Complete.", user_id)

dfp_mm = DFPMessageMeta(cudf.from_pandas(final_df), user_id=user_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,24 @@
# limitations under the License.
"""Stage for converting fsspec file objects to a DataFrame."""

import hashlib
import json
import logging
import os
import time
import typing
from functools import partial

import fsspec
import mrc
import pandas as pd
from mrc.core import operators as ops

from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.io.deserializers import read_file_to_df
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import process_dataframe
from morpheus.utils.downloader import Downloader
from morpheus.utils.controllers.file_to_df_controller import FileToDFController

logger = logging.getLogger(f"morpheus.{__name__}")


def _single_object_to_dataframe(file_object: fsspec.core.OpenFile,
schema: DataFrameInputSchema,
file_type: FileTypes,
filter_null: bool,
parser_kwargs: dict) -> pd.DataFrame:
retries = 0
s3_df = None
while (retries < 2):
try:
with file_object as f:
s3_df = read_file_to_df(f,
file_type,
filter_nulls=filter_null,
df_type="pandas",
parser_kwargs=parser_kwargs)

break
except Exception as e:
if (retries < 2):
logger.warning("Error fetching %s: %s\nRetrying...", file_object, e)
retries += 1

# Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it
# increases performance significantly)
if (schema.prep_dataframe is not None):
s3_df = schema.prep_dataframe(s3_df)

return s3_df


class DFPFileToDataFrameStage(PreallocatorMixin, SinglePortStage):
"""
Stage for converting fsspec file objects to a DataFrame, pre-processing the DataFrame according to `schema`, and
Expand Down Expand Up @@ -102,14 +64,13 @@ def __init__(self,
cache_dir: str = "./.cache/dfp"):
super().__init__(c)

self._schema = schema

self._file_type = file_type
self._filter_null = filter_null
self._parser_kwargs = {} if parser_kwargs is None else parser_kwargs
self._cache_dir = os.path.join(cache_dir, "file_cache")

self._downloader = Downloader()
timestamp_column_name = c.ae.timestamp_column_name
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
self._controller = FileToDFController(schema=schema,
filter_null=filter_null,
file_type=file_type,
parser_kwargs=parser_kwargs,
cache_dir=cache_dir,
timestamp_column_name=timestamp_column_name)

@property
def name(self) -> str:
Expand All @@ -124,103 +85,10 @@ def accepted_types(self) -> typing.Tuple:
"""Accepted input types."""
return (typing.Any, )

def _get_or_create_dataframe_from_s3_batch(
self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[pd.DataFrame, bool]:

if (not file_object_batch):
raise RuntimeError("No file objects to process")

file_list = file_object_batch[0]
batch_count = file_object_batch[1]

file_system: fsspec.AbstractFileSystem = file_list.fs

# Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just
# hashes all the output of `info()` which is perfect
hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list]

# Convert to base 64 encoding to remove - values
objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()

batch_cache_location = os.path.join(self._cache_dir, "batches", f"{objects_hash_hex}.pkl")

# Return the cache if it exists
if (os.path.exists(batch_cache_location)):
output_df = pd.read_pickle(batch_cache_location)
output_df["batch_count"] = batch_count
output_df["origin_hash"] = objects_hash_hex

return (output_df, True)

# Cache miss
download_method = partial(_single_object_to_dataframe,
schema=self._schema,
file_type=self._file_type,
filter_null=self._filter_null,
parser_kwargs=self._parser_kwargs)

download_buckets = file_list

# Loop over dataframes and concat into one
try:
dfs = self._downloader.download(download_buckets, download_method)
except Exception:
logger.exception("Failed to download logs. Error: ", exc_info=True)
raise

if (dfs is None or len(dfs) == 0):
raise ValueError("No logs were downloaded")

output_df: pd.DataFrame = pd.concat(dfs)
output_df = process_dataframe(df_in=output_df, input_schema=self._schema)

# Finally sort by timestamp and then reset the index
output_df.sort_values(by=[self._config.ae.timestamp_column_name], inplace=True)

output_df.reset_index(drop=True, inplace=True)

# Save dataframe to cache future runs
os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True)

try:
output_df.to_pickle(batch_cache_location)
except Exception:
logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True)

output_df["batch_count"] = batch_count
output_df["origin_hash"] = objects_hash_hex

return (output_df, False)

def convert_to_dataframe(self, s3_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]):
"""Converts a batch of S3 objects to a DataFrame."""
if (not s3_object_batch):
return None

start_time = time.time()

try:

output_df, cache_hit = self._get_or_create_dataframe_from_s3_batch(s3_object_batch)

duration = (time.time() - start_time) * 1000.0

if (output_df is not None and logger.isEnabledFor(logging.DEBUG)):
logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s",
len(output_df),
"hit" if cache_hit else "miss",
duration,
len(output_df) / (duration / 1000.0))

return output_df
except Exception:
logger.exception("Error while converting S3 buckets to DF.")
raise

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = builder.make_node(self.unique_name,
ops.map(self.convert_to_dataframe),
ops.on_completed(self._downloader.close))
ops.map(self._controller.convert_to_dataframe),
ops.on_completed(self._controller.close))
builder.make_edge(input_stream[0], stream)

return stream, pd.DataFrame
Loading
Loading