Skip to content

Commit

Permalink
Pre-allocate needed columns in abp_pcap_detection example (#820)
Browse files Browse the repository at this point in the history
* `AbpPcapPreprocessingStage` requests needed columns to be pre-allocated
* Add an end-to-end test for the `AbpPcapPreprocessingStage` (part of #849)
* Add `tests/tests_data/abp_pcap.jsonlines` which is the first 20 lines of the much larger `examples/data/abp_pcap_dump.jsonlines` (200MB) which is not downloaded in the test CI stage.

fixes #797

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #820
  • Loading branch information
dagardner-nv authored Apr 10, 2023
1 parent efc8d6e commit 423e7bb
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 17 deletions.
30 changes: 17 additions & 13 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import InferenceMemoryFIL
Expand Down Expand Up @@ -67,6 +68,12 @@ def __init__(self, c: Config):
self.features
), f"Number of features in preprocessing {len(self.features)}, does not match configuration {self._fea_length}"

# columns required to be added to input message meta
self.req_cols = ["flow_id", "rollup_time"]

for req_col in self.req_cols:
self._needed_columns[req_col] = TypeId.STRING

@property
def name(self) -> str:
return "preprocess-anomaly"
Expand All @@ -75,7 +82,8 @@ def supports_cpp_node(self):
return False

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str]) -> MultiInferenceFILMessage:
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> MultiInferenceFILMessage:
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

Expand Down Expand Up @@ -166,31 +174,27 @@ def round_time_kernel(timestamp, rollup_time, secs):
data = cp.asarray(merged_df[fea_cols].to_cupy())
count = data.shape[0]

# columns required to be added to input message meta
req_cols = ["flow_id", "rollup_time"]

for col in req_cols:
x.set_meta(col, merged_df[col])

del merged_df

seg_ids = cp.zeros((count, 3), dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1
seq_ids = cp.zeros((count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seq_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seg_ids)
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)

infer_message = MultiInferenceFILMessage.from_message(x, memory=memory)

return infer_message

def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]:
return partial(
AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
)
return partial(AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
req_cols=self.req_cols)

def _get_preprocess_node(self, builder: mrc.Builder):
return _stages.AbpPcapPreprocessingStage(builder, self.unique_name)
47 changes: 43 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import signal
import subprocess
import sys
import time
import typing
import warnings
Expand Down Expand Up @@ -359,18 +360,56 @@ def restore_environ():
del (os.environ[key])


@pytest.fixture(scope="function")
def restore_sys_path():
orig_vars = sys.path.copy()
yield sys.path
sys.path = orig_vars


@pytest.fixture(scope="function")
def import_mod(request: pytest.FixtureRequest, restore_sys_path):
marker = request.node.get_closest_marker("import_mod")
if marker is not None:
mod_paths = marker.args[0]
if not isinstance(mod_paths, list):
mod_paths = [mod_paths]

modules = []
for mod_path in mod_paths:
mod_dir, mod_fname = os.path.split(mod_path)
mod_name, _ = os.path.splitext(mod_fname)

sys.path.append(mod_dir)
mod = importlib.import_module(mod_name)
assert mod.__file__ == mod_path

modules.append(mod)

yield modules

else:
raise ValueError("import_mod fixture requires setting paths in markers: "
"`@pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'log_parsing/messages.py')])`")


def _reload_modules(modules: typing.List[typing.Any]):
for mod in modules:
importlib.reload(mod)


@pytest.fixture(scope="function")
def reload_modules(request: pytest.FixtureRequest):
marker = request.node.get_closest_marker("reload_modules")
yield

modules = []
if marker is not None:
modules = marker.args[0]
if not isinstance(modules, list):
modules = [modules]

for mod in modules:
importlib.reload(mod)
_reload_modules(modules)
yield
_reload_modules(modules)


@pytest.fixture(scope="function")
Expand Down
122 changes: 122 additions & 0 deletions tests/examples/test_abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import typing

import cupy as cp
import numpy as np
import pytest

import cudf

from morpheus.common import TypeId
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiMessage
from utils import TEST_DIRS


def check_inf_message(msg: MultiInferenceFILMessage,
expected_meta: MessageMeta,
expected_mess_offset: int,
expected_mess_count: int,
expected_offset: int,
expected_count: int,
expected_feature_length: int,
expected_flow_ids: cudf.Series,
expected_rollup_time: str,
expected_input__0: cp.ndarray):
assert isinstance(msg, MultiInferenceFILMessage)
assert msg.meta is expected_meta
assert msg.mess_offset == expected_mess_offset
assert msg.mess_count == expected_mess_count
assert msg.offset == expected_offset
assert msg.count == expected_count

df = msg.get_meta()
assert 'flow_id' in df
assert 'rollup_time' in df

assert (df.flow_id == expected_flow_ids).all()
assert (df.rollup_time == expected_rollup_time).all()

assert msg.memory.has_tensor('input__0')
assert msg.memory.has_tensor('seq_ids')

input__0 = msg.memory.get_tensor('input__0')
assert input__0.shape == (expected_count, expected_feature_length)
assert (input__0 == expected_input__0).all()

seq_ids = msg.memory.get_tensor('seq_ids')
assert seq_ids.shape == (expected_count, 3)
assert (seq_ids[:, 0] == cp.arange(expected_mess_offset,
expected_mess_offset + expected_mess_count,
dtype=cp.uint32)).all()
assert (seq_ids[:, 1] == 0).all()
assert (seq_ids[:, 2] == expected_feature_length - 1).all()


@pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'abp_pcap_detection/abp_pcap_preprocessing.py')])
def test_abp_pcap_preprocessing(config, import_mod: typing.List[typing.Any]):
# Setup the config
config.mode = PipelineModes.FIL
config.feature_length = 13

abp_pcap_preprocessing = import_mod[0]

# Get our input data, should contain the first 20 lines of the production data
input_file = os.path.join(TEST_DIRS.tests_data_dir, 'abp_pcap.jsonlines')
input_df = read_file_to_df(input_file, df_type='cudf', filter_nulls=False)

expected_flow_ids = input_df.src_ip + ":" + input_df.src_port + "=" + input_df.dest_ip + ":" + input_df.dest_port
expected_input__0 = cp.asarray(
np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'abp_pcap_expected_input_0.csv'), delimiter=",", skiprows=0))

assert len(input_df) == 20

meta = MessageMeta(input_df)
mm1 = MultiMessage(meta=meta, mess_offset=0, mess_count=10)
mm2 = MultiMessage(meta=meta, mess_offset=10, mess_count=10)

stage = abp_pcap_preprocessing.AbpPcapPreprocessingStage(config)
assert stage.get_needed_columns() == {'flow_id': TypeId.STRING, 'rollup_time': TypeId.STRING}

inf1 = stage.pre_process_batch(mm1, config.feature_length, stage.features, stage.req_cols)
check_inf_message(inf1,
expected_meta=meta,
expected_mess_offset=0,
expected_mess_count=10,
expected_offset=0,
expected_count=10,
expected_feature_length=config.feature_length,
expected_flow_ids=expected_flow_ids[0:10],
expected_rollup_time='2021-04-07 15:55',
expected_input__0=expected_input__0[0:10])

inf2 = stage.pre_process_batch(mm2, config.feature_length, stage.features, stage.req_cols)
check_inf_message(inf2,
expected_meta=meta,
expected_mess_offset=10,
expected_mess_count=10,
expected_offset=0,
expected_count=10,
expected_feature_length=config.feature_length,
expected_flow_ids=expected_flow_ids[10:],
expected_rollup_time='2021-04-07 15:55',
expected_input__0=expected_input__0[10:])
3 changes: 3 additions & 0 deletions tests/tests_data/abp_pcap.jsonlines
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/tests_data/abp_pcap_expected_input_0.csv
Git LFS file not shown
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, cur_file=__file__) -> None:
self.tests_dir = os.path.dirname(cur_file)
self.morpheus_root = os.environ.get('MORPHEUS_ROOT', os.path.dirname(self.tests_dir))
self.data_dir = morpheus.DATA_DIR
self.examples_dir = os.path.join(self.morpheus_root, 'examples')
self.models_dir = os.path.join(self.morpheus_root, 'models')
self.datasets_dir = os.path.join(self.models_dir, 'datasets')
self.training_data_dir = os.path.join(self.datasets_dir, 'training-data')
Expand Down

0 comments on commit 423e7bb

Please sign in to comment.