diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..ea4b0b3d3a --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "examples/sid_visualization/viz"] + path = examples/sid_visualization/viz + url = ../../nv-morpheus/morpheus-visualizations.git + branch = branch-22.09 diff --git a/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines b/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines new file mode 100644 index 0000000000..e854903f0a --- /dev/null +++ b/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7bbd3c8c318a17a99b2f873fb09f1195e60c1337e95a003192b780acada0754c +size 960569 diff --git a/examples/data/sid_visualization/group1-benign-2nodes.jsonlines b/examples/data/sid_visualization/group1-benign-2nodes.jsonlines new file mode 100644 index 0000000000..e854903f0a --- /dev/null +++ b/examples/data/sid_visualization/group1-benign-2nodes.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7bbd3c8c318a17a99b2f873fb09f1195e60c1337e95a003192b780acada0754c +size 960569 diff --git a/examples/data/sid_visualization/group2-benign-50nodes.jsonlines b/examples/data/sid_visualization/group2-benign-50nodes.jsonlines new file mode 100644 index 0000000000..a26aa6c615 --- /dev/null +++ b/examples/data/sid_visualization/group2-benign-50nodes.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5cb0b68592646a63a9687acacdc5a1a56ebd9a2f91041be477b23ad6e7ca8a24 +size 45001515 diff --git a/examples/data/sid_visualization/group3-si-50nodes.jsonlines b/examples/data/sid_visualization/group3-si-50nodes.jsonlines new file mode 100644 index 0000000000..ecd6232f6f --- /dev/null +++ b/examples/data/sid_visualization/group3-si-50nodes.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:663cbe7c7f5c7af9d988cdad4b0ca7a1f6d70c25bd06106c30fc5332a320c883 +size 45466635 diff --git a/examples/data/sid_visualization/group4-benign-49nodes.jsonlines b/examples/data/sid_visualization/group4-benign-49nodes.jsonlines new file mode 100644 index 0000000000..c8bc8edadc --- /dev/null +++ b/examples/data/sid_visualization/group4-benign-49nodes.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5371af3baf386f3d58672dca57dc179db6f8a9478e7e0b0dca5e3c0e8b1584c3 +size 82345519 diff --git a/examples/sid_visualization/README.md b/examples/sid_visualization/README.md new file mode 100644 index 0000000000..9b5a1d3b14 --- /dev/null +++ b/examples/sid_visualization/README.md @@ -0,0 +1,153 @@ +# SID Visualization Example + +## Prerequisites + +To run the demo you will need the following: +- Docker +- `docker-compose` (Tested with version 1.29) + +## Setup + +To run this demo, ensure all submodules are checked out: +```bash +git submodule update --init --recursive +``` + +### Build Morpheus Dev Container + +Before launching the demo, we need the dev container for Morpheus to be created: +```bash +export DOCKER_IMAGE_TAG="sid-viz" +``` + +# Build the dev container +```bash +./docker/build_container_dev.sh +``` + +### Launch User Interface + +We will use docker-compose to build and run the entire demo. To launch everything, run the following from the repo root: + +Save the Morpheus repo directory: +```bash +export MORPHEUS_HOME=$(git rev-parse --show-toplevel) +``` + +Ensure SID model is downloaded for deployment to Triton: +```bash +./scripts/fetch_data.py fetch models +``` + +Change to the example directory: +```bash +cd ${MORPHEUS_HOME}/examples/sid_visualization +``` + +Launch the containers: +```bash +DOCKER_BUILDKIT=1 docker-compose up --build -d +``` + +The following GUI should be displayed when all containers have completed launching: + + + +### Build Morpheus + +Once docker-compose command has completed and GUI is displayed, exec into the container to build and run Morpheus: + +Exec into the morpheus container: +```bash +docker-compose exec morpheus bash +``` + +Inside the container, compile morpheus: +```bash +BUILD_DIR=build-docker ./scripts/compile.sh +``` + +Install morpheus with an extra dependency: +```bash +pip install -e . && pip install websockets +``` + +Verify Morpheus is installed: +```bash +morpheus --version +``` + +Ensure the data has been downloaded: +```bash +./scripts/fetch_data.py fetch examples +``` + +***Keep this shell in the Morpheus Dev container running. It will be used later to start Morpheus.*** + +## Running the Demo + +### Running Morpheus + +After the GUI has been launched, Morpheus now needs to be started. In the same shell used to build Morpheus (the one running the Morpheus Dev container), run the following: +```bash +python examples/sid_visualization/run.py \ + --debug --use_cpp=False --num_threads=1 \ + --triton_server_url=triton:8001 \ + --input_file=./examples/data/sid_visualization/group1-benign-2nodes.jsonlines \ + --input_file=./examples/data/sid_visualization/group2-benign-50nodes.jsonlines \ + --input_file=./examples/data/sid_visualization/group3-si-50nodes.jsonlines \ + --input_file=./examples/data/sid_visualization/group4-benign-49nodes.jsonlines +``` + +**Note:** The first run of this script will take a few minutes to allow Triton to convert the deployed ONNX model to TensorRT. Subsequent runs will not include this conversion step so will be much quicker. + +This launch will use all of the available datasets. Each dataset will show up as one batch in the visualization. Here is a description of each dataset: + +- `examples/data/sid_visualization/group1-benign-2nodes.jsonlines` + - Small scale with 2 nodes, no SID +- `examples/data/sid_visualization/group2-benign-50nodes.jsonlines` + - Scale up to 50 nodes, no SID +- `examples/data/sid_visualization/group3-si-50nodes.jsonlines` + - 50 nodes, with SID from a single node +- `examples/data/sid_visualization/group4-benign-49nodes.jsonlines` + - Isolate bad node leaving 49 nodes, no SID + +The following is a screenshot after all four batches have been processed: + + + +Use the slider or the following buttons to step through the inferences batches in the visualization: +| | | +| ---------------------------- | ------------------------------------------------- | +| | Step to previous inference batch | +| | Step to next inference batch | +| | Step through all inference batches from beginning | +| | Pause animation | + +The visualization on the right shows nodes in the current inference batch represented as +green spheres. White (benign) and red (SI) packets are shown flowing between the node connections. +While the animation is running, you can click the pause button or toggle off `Simulating`. Once paused, +you will be able to hover over an individual packet to view its contents. + +Changing the dataset does not require relaunching the GUI. Simply re-run Morpheus with the new dataset and the GUI will be updated. + +It's also possible to launch the demo using the Morpheus CLI using the following: + +```bash +DEMO_DATASET="examples/data/sid_visualization/group1-benign-2nodes.jsonlines" +``` + +```bash +morpheus --log_level=DEBUG \ + run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --edge_buffer_size=4 --use_cpp=False \ + pipeline-nlp --model_seq_length=256 \ + from-file --filename=${DEMO_DATASET} \ + deserialize \ + preprocess --vocab_hash_file=morpheus/data/bert-base-uncased-hash.txt --truncation=True --do_lower_case=True --add_special_tokens=False \ + inf-triton --model_name=sid-minibert-onnx --server_url=triton:8001 --force_convert_inputs=True \ + monitor --description Inference\ Rate --unit=inf \ + add-class \ + gen-viz +``` + +Note, this launch method is more useful for showing performance than showing capability. diff --git a/examples/sid_visualization/docker-compose.yml b/examples/sid_visualization/docker-compose.yml new file mode 100644 index 0000000000..f123f0d867 --- /dev/null +++ b/examples/sid_visualization/docker-compose.yml @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3" + +x-with-gpus: &with_gpus + deploy: + resources: + reservations: + devices: + - capabilities: + - gpu + +services: + triton: + image: nvcr.io/nvidia/tritonserver:22.06-py3 + <<: *with_gpus + command: "tritonserver --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx --model-repository=/models/triton-model-repo" + environment: + NVIDIA_VISIBLE_DEVICES: "${NVIDIA_VISIBLE_DEVICES:-all}" + ports: + - "8000" + - "8001" + - "8002" + runtime: nvidia + volumes: + - "${MORPHEUS_HOME:-../..}/models:/models" + + gui: + image: sid-viz:latest + <<: *with_gpus + build: + context: viz + args: + RAPIDSAI_GPU_ARCH: "${RAPIDSAI_GPU_ARCH:-}" # 60 | 70 | 75 | 80 | 86 + cap_add: + - SYS_ADMIN + - SYS_PTRACE + security_opt: + - apparmor=unconfined + environment: + NVIDIA_DRIVER_CAPABILITIES: all + MORPHEUS_SOCKET_URL: "morpheus:8765" + # Colorize the terminal in the container if possible + TERM: "${TERM:-}" + # Use the host's X11 display + DISPLAY: "${DISPLAY:-}" + XAUTHORITY: "${XAUTHORITY:-}" + XDG_SESSION_TYPE: "${XDG_SESSION_TYPE:-}" + XDG_RUNTIME_DIR: "${XDG_RUNTIME_DIR:?XDG_RUNTIME_DIR must be set}" + DBUS_SESSION_BUS_ADDRESS: "${DBUS_SESSION_BUS_ADDRESS:?DBUS_SESSION_BUS_ADDRESS must be set}" + runtime: nvidia + volumes: + - "/etc/fonts:/etc/fonts:ro" + - "/etc/timezone:/etc/timezone:ro" + - "/etc/localtime:/etc/localtime:ro" + - "/tmp/.X11-unix:/tmp/.X11-unix:rw" + - "/usr/share/fonts:/usr/share/fonts:ro" + - "/usr/share/icons:/usr/share/icons:ro" + - "${XDG_RUNTIME_DIR}:${XDG_RUNTIME_DIR}" + - "/run/dbus/system_bus_socket:/run/dbus/system_bus_socket" + + morpheus: + image: morpheus:sid-viz + <<: *with_gpus + command: bash + cap_add: + - SYS_NICE + depends_on: + - gui + - triton + environment: + BUILD_DIR: build-docker # Avoid conflicting with the host default build + NVIDIA_VISIBLE_DEVICES: "${NVIDIA_VISIBLE_DEVICES:-all}" + ports: + - "8765" + stdin_open: true + tty: true + runtime: nvidia + volumes: + - "${MORPHEUS_HOME:-../../}:/workspace" diff --git a/examples/sid_visualization/img/full_win.png b/examples/sid_visualization/img/full_win.png new file mode 100644 index 0000000000..54486c1e46 Binary files /dev/null and b/examples/sid_visualization/img/full_win.png differ diff --git a/examples/sid_visualization/img/initial_win.png b/examples/sid_visualization/img/initial_win.png new file mode 100644 index 0000000000..7a18a61397 Binary files /dev/null and b/examples/sid_visualization/img/initial_win.png differ diff --git a/examples/sid_visualization/img/left.png b/examples/sid_visualization/img/left.png new file mode 100644 index 0000000000..83fcee62a2 Binary files /dev/null and b/examples/sid_visualization/img/left.png differ diff --git a/examples/sid_visualization/img/pause.png b/examples/sid_visualization/img/pause.png new file mode 100644 index 0000000000..85ef8af8c4 Binary files /dev/null and b/examples/sid_visualization/img/pause.png differ diff --git a/examples/sid_visualization/img/replay.png b/examples/sid_visualization/img/replay.png new file mode 100644 index 0000000000..7997996693 Binary files /dev/null and b/examples/sid_visualization/img/replay.png differ diff --git a/examples/sid_visualization/img/right.png b/examples/sid_visualization/img/right.png new file mode 100644 index 0000000000..b6982549e8 Binary files /dev/null and b/examples/sid_visualization/img/right.png differ diff --git a/examples/sid_visualization/run.py b/examples/sid_visualization/run.py new file mode 100644 index 0000000000..83a1726aa5 --- /dev/null +++ b/examples/sid_visualization/run.py @@ -0,0 +1,222 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import typing + +import click +import srf + +from morpheus._lib.file_types import FileTypes +from morpheus._lib.messages import MessageMeta +from morpheus.config import Config +from morpheus.config import CppConfig +from morpheus.config import PipelineModes +from morpheus.io.deserializers import read_file_to_df +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage +from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage +from morpheus.stages.postprocess.generate_viz_frames_stage import GenerateVizFramesStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage +from morpheus.utils.file_utils import get_data_file_path +from morpheus.utils.file_utils import load_labels_file +from morpheus.utils.logger import configure_logging + + +class NLPVizFileSource(SingleOutputSource): + """ + Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for + testing performance and accuracy of a pipeline. + + Parameters + ---------- + c : `morpheus.config.Config` + Pipeline configuration instance. + filename : str + Name of the file from which the messages will be read. + iterative: boolean + Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. Iterative mode is + good for interleaving source stages. + file_type : `morpheus._lib.file_types.FileTypes`, default = 'auto' + Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. + Supported extensions: 'json', 'csv' + repeat: int, default = 1 + Repeats the input dataset multiple times. Useful to extend small datasets for debugging. + filter_null: bool, default = True + Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down + the line with processing. Setting this to True is recommended. + cudf_kwargs: dict, default=None + keyword args passed to underlying cuDF I/O function. See the cuDF documentation for `cudf.read_csv()` and + `cudf.read_json()` for the available options. With `file_type` == 'json', this defaults to ``{ "lines": True }`` + and with `file_type` == 'csv', this defaults to ``{}``. + """ + + def __init__(self, + c: Config, + filenames: typing.List[str], + file_type: FileTypes = FileTypes.Auto, + cudf_kwargs: dict = None): + + super().__init__(c) + + self._batch_size = c.pipeline_batch_size + + self._filenames = filenames + self._file_type = file_type + self._cudf_kwargs = {} if cudf_kwargs is None else cudf_kwargs + + self._input_count = None + self._max_concurrent = c.num_threads + + @property + def name(self) -> str: + return "from-multi-file" + + @property + def input_count(self) -> int: + """Return None for no max intput count""" + return self._input_count + + def supports_cpp_node(self): + return False + + def _build_source(self, builder: srf.Builder) -> StreamPair: + + if self._build_cpp_node(): + raise RuntimeError("Does not support C++ nodes") + else: + out_stream = builder.make_source(self.unique_name, self._generate_frames()) + + out_type = MessageMeta + + return out_stream, out_type + + def _generate_frames(self): + + for f in self._filenames: + + # Read the dataframe into memory + df = read_file_to_df( + f, + self._file_type, + filter_nulls=True, + df_type="cudf", + ) + + # Truncate it down to the max size + df = df.head(self._batch_size) + + x = MessageMeta(df) + + yield x + + +@click.command() +@click.option("--debug/--no-debug", default=False) +@click.option('--use_cpp', default=False) +@click.option( + "--num_threads", + default=os.cpu_count(), + type=click.IntRange(min=1), + help="Number of internal pipeline threads to use", +) +@click.option( + "--input_file", + "-f", + type=click.Path(exists=True, dir_okay=False), + multiple=True, + required=True, + default=[ + "examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines", + "examples/data/sid_visualization/group2-benign-50nodes.jsonlines" + ], + help="List of files to send to the visualization, in order", +) +@click.option('--max_batch_size', + default=50000, + type=click.IntRange(min=1), + help=("For each input_file, truncate the number of rows to this size.")) +@click.option( + "--model_name", + default="sid-minibert-onnx", + help="The name of the model that is deployed on Tritonserver", +) +@click.option("--triton_server_url", default="localhost:8001", required=True, help="Tritonserver url") +def run_pipeline(debug, use_cpp, num_threads, input_file, max_batch_size, model_name, triton_server_url): + + if debug: + configure_logging(log_level=logging.DEBUG) + else: + configure_logging(log_level=logging.INFO) + + CppConfig.set_should_use_cpp(use_cpp) + + # Its necessary to get the global config object and configure it for FIL mode + config = Config() + config.mode = PipelineModes.NLP + + # Below properties are specified by the command line + config.num_threads = num_threads + config.pipeline_batch_size = max_batch_size + config.feature_length = 256 + config.class_labels = load_labels_file(get_data_file_path("data/labels_nlp.txt")) + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + # Set source stage + # This stage reads raw data from the required plugins and merge all the plugins data into a single dataframe + # for a given source. + pipeline.set_source(NLPVizFileSource(config, filenames=input_file)) + + pipeline.add_stage(DeserializeStage(config)) + + pipeline.add_stage( + PreprocessNLPStage(config, + vocab_hash_file=get_data_file_path("data/bert-base-uncased-hash.txt"), + truncation=True, + do_lower_case=True, + add_special_tokens=False)) + + # Add a inference stage + pipeline.add_stage( + TritonInferenceStage( + config, + model_name=model_name, + server_url=triton_server_url, + force_convert_inputs=True, + )) + + # Add a monitor stage + pipeline.add_stage(MonitorStage(config, description="Inference rate")) + + pipeline.add_stage(AddClassificationsStage(config, threshold=0.8)) + + pipeline.add_stage(GenerateVizFramesStage(config, server_url="0.0.0.0", server_port=8765)) + + # Build pipeline + pipeline.build() + + # Run the pipeline + pipeline.run() + + +# Execution starts here +if __name__ == "__main__": + run_pipeline() diff --git a/examples/sid_visualization/viz b/examples/sid_visualization/viz new file mode 160000 index 0000000000..a7ca78106c --- /dev/null +++ b/examples/sid_visualization/viz @@ -0,0 +1 @@ +Subproject commit a7ca78106c5439e4e0f8687ea297b44646317311 diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py index 300b7812ea..d7bd8fd4be 100644 --- a/morpheus/pipeline/pipeline.py +++ b/morpheus/pipeline/pipeline.py @@ -226,7 +226,7 @@ async def join(self): for s in list(self._stages): await s.join() - def build_and_start(self): + async def build_and_start(self): if (not self.is_built): try: @@ -235,8 +235,16 @@ def build_and_start(self): logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True) return + await self.async_start() + self.start() + async def async_start(self): + + # Loop over all stages and call on_start if it exists + for s in self._stages: + await s.start_async() + def _on_start(self): # Only execute this once @@ -417,7 +425,7 @@ def term_signal(): loop.add_signal_handler(s, term_signal) try: - self.build_and_start() + await self.build_and_start() # Wait for completion await self.join() diff --git a/morpheus/pipeline/stage.py b/morpheus/pipeline/stage.py index d32309d2ca..a6d4ca52df 100644 --- a/morpheus/pipeline/stage.py +++ b/morpheus/pipeline/stage.py @@ -52,6 +52,13 @@ def on_start(self): """ pass + async def start_async(self): + """ + This function is called along with on_start during stage initialization. Allows stages to utilize the + asyncio loop if needed. + """ + pass + def _on_complete(self, stream): logger.info("Stage Complete: {}".format(self.name)) diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py index 2fd2b9e3e3..8289c023cc 100644 --- a/morpheus/stages/postprocess/generate_viz_frames_stage.py +++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py @@ -12,15 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import json +import logging import os -import shutil +import sys import typing -import warnings import numpy as np import pandas as pd +import pyarrow as pa import srf +import srf.core.operators as ops +import websockets.legacy.server +from websockets.server import serve + +import cudf from morpheus.cli.register_stage import register_stage from morpheus.config import Config @@ -28,6 +35,10 @@ from morpheus.messages import MultiResponseProbsMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.producer_consumer_queue import AsyncIOProducerConsumerQueue +from morpheus.utils.producer_consumer_queue import Closed + +logger = logging.getLogger(__name__) @register_stage("gen-viz", modes=[PipelineModes.NLP], command_args={"deprecated": True}) @@ -37,32 +48,26 @@ class GenerateVizFramesStage(SinglePortStage): Parameters ---------- - c : `morpheus.config.Config` - Pipeline configuration instance. + c : morpheus.config.Config + Pipeline configuration instance out_dir : str - Output directory to write visualization frames. + Output directory to write visualization frames overwrite : bool - Overwrite file if exists. + Overwrite file if exists """ - def __init__(self, c: Config, out_dir: str = "./viz_frames", overwrite: bool = False): + def __init__(self, c: Config, server_url: str = "0.0.0.0", server_port: int = 8765): super().__init__(c) - self._out_dir = out_dir - self._overwrite = overwrite - - if (os.path.exists(self._out_dir)): - if (self._overwrite): - shutil.rmtree(self._out_dir) - elif (len(list(os.listdir(self._out_dir))) > 0): - warnings.warn(("Viz output directory '{}' already exists. " - "Errors will occur if frames try to be written over existing files. " - "Suggest emptying the directory or setting `overwrite=True`").format(self._out_dir)) - - os.makedirs(self._out_dir, exist_ok=True) + self._server_url = server_url + self._server_port = server_port self._first_timestamp = -1 + self._buffers = [] + self._buffer_queue: AsyncIOProducerConsumerQueue = None + + self._replay_buffer = [] @property def name(self) -> str: @@ -74,8 +79,8 @@ def accepted_types(self) -> typing.Tuple: Returns ------- - typing.Tuple[`morpheus.pipeline.messages.MultiResponseProbsMessage`, ] - Accepted input types. + typing.Tuple[morpheus.pipeline.messages.MultiResponseProbsMessage, ] + Accepted input types """ return (MultiResponseProbsMessage, ) @@ -86,17 +91,17 @@ def supports_cpp_node(self): @staticmethod def round_to_sec(x): """ - Round to even seconds second. + Round to even seconds second Parameters ---------- x : int/float - Rounding up the value. + Rounding up the value Returns ------- int - Value rounded up. + Value rounded up """ return int(round(x / 1000.0) * 1000) @@ -148,8 +153,6 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]): in_df = pd.concat([df for _, df in x], ignore_index=True).sort_values(by=["timestamp"]) - # curr_timestamp = GenerateVizFramesStage.round_to_sec(in_df["timestamp"].iloc[0]) - if (self._first_timestamp == -1): self._first_timestamp = curr_timestamp @@ -161,21 +164,115 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]): in_df.to_csv(fn, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"]) - def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: + async def start_async(self): + + loop = asyncio.get_event_loop() + self._loop = loop + + self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2, loop=loop) + + async def client_connected(websocket: websockets.legacy.server.WebSocketServerProtocol): + + logger.info("Got connection from: {}:{}".format(*websocket.remote_address)) + + while True: + try: + next_buffer = await self._buffer_queue.get() + await websocket.send(next_buffer.to_pybytes()) + except Closed: + break + except Exception as ex: + logger.exception("Error occurred trying to send message over socket", exc_info=ex) + + logger.info("Disconnected from: {}:{}".format(*websocket.remote_address)) + + async def run_server(): + + try: + + async with serve(client_connected, self._server_url, self._server_port) as server: + + listening_on = [":".join([str(y) for y in x.getsockname()]) for x in server.sockets] + listening_on_str = [f"'{x}'" for x in listening_on] + + logger.info("Websocket server listening at: {}".format(", ".join(listening_on_str))) + + await self._server_close_event.wait() + + logger.info("Server shut down") + + logger.info("Server shut down. Is queue empty: {}".format(self._buffer_queue.empty())) + except Exception as e: + logger.error("Error during serve", exc_info=e) + raise + + self._server_task = loop.create_task(run_server()) + + self._server_close_event = asyncio.Event(loop=loop) + + await asyncio.sleep(1.0) + + return await super().start_async() + + async def _stop_server(self): + + logger.info("Shutting down queue") + + await self._buffer_queue.close() + + self._server_close_event.set() + + # Wait for it to + await self._server_task + + def _build_single(self, seg: srf.Builder, input_stream: StreamPair) -> StreamPair: stream = input_stream[0] - # Convert stream to dataframes - stream = stream.map(self._to_vis_df) # Convert group to dataframe + def node_fn(input, output): + + def write_batch(x: MultiResponseProbsMessage): + + sink = pa.BufferOutputStream() + + # This is the timestamp of the earliest message + t0 = x.get_meta("timestamp").min() + + df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"]) + + out_df = cudf.DataFrame() + + out_df["dt"] = (df["timestamp"] - t0).astype(np.int32) + out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32) + out_df["dst"] = df["dest_ip"].str.ip_to_int().astype(np.int32) + out_df["lvl"] = df["secret_keys"].astype(np.int32) + out_df["data"] = df["data"] + + array_table = out_df.to_arrow() + + with pa.ipc.new_stream(sink, array_table.schema) as writer: + writer.write(array_table) + + out_buf = sink.getvalue() + + # Enqueue the buffer and block until that completes + asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result() + + input.pipe(ops.map(write_batch)).subscribe(output) + + logger.info("Gen-viz stage completed. Waiting for shutdown") + + shutdown_future = asyncio.run_coroutine_threadsafe(self._stop_server(), loop=self._loop) - # Flatten the list of tuples - stream = stream.flatten() + # Wait for shutdown. Unless we have a debugger attached + shutdown_future.result(timeout=2.0 if sys.gettrace() is None else None) - # Partition by group times - stream = stream.partition(10000, timeout=10, key=lambda x: x[0]) # Group - # stream = stream.filter(lambda x: len(x) > 0) + logger.info("Gen-viz shutdown complete") - stream.sink(self._write_viz_file) + # Sink to file + to_file = seg.make_node_full(self.unique_name, node_fn) + seg.make_edge(stream, to_file) + stream = to_file - # Return input unchanged + # Return input unchanged to allow passthrough return input_stream diff --git a/morpheus/utils/file_utils.py b/morpheus/utils/file_utils.py new file mode 100644 index 0000000000..c5251527e7 --- /dev/null +++ b/morpheus/utils/file_utils.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import typing + +import morpheus + + +def get_data_file_path(data_filename: str) -> str: + """ + Get data file path. Also handles paths relative to Morpheus root. + + Parameters + ---------- + data_filename : str + Absolute or relative path of data file. + + Returns + ------- + str + Data file path. + """ + # First check if the path is relative + if (os.path.isabs(data_filename)): + # Already absolute, nothing to do + return data_filename + + # See if the file exists. + does_exist = os.path.exists(data_filename) + + if (not does_exist): + # If it doesn't exist, then try to make it relative to the morpheus library root + morpheus_root = os.path.dirname(morpheus.__file__) + + value_abs_to_root = os.path.join(morpheus_root, data_filename) + + # If the file relative to our package exists, use that instead + if (os.path.exists(value_abs_to_root)): + + return value_abs_to_root + + return data_filename + + +def load_labels_file(labels_filename: str) -> typing.List[str]: + """ + Get list of labels from file. + + Parameters + ---------- + labels_filename : str + Labels file path + + Returns + ------- + typing.List[str] + List of labels + """ + + with open(labels_filename, "r") as lf: + return [x.strip() for x in lf.readlines()]