diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000000..ea4b0b3d3a
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "examples/sid_visualization/viz"]
+ path = examples/sid_visualization/viz
+ url = ../../nv-morpheus/morpheus-visualizations.git
+ branch = branch-22.09
diff --git a/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines b/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines
new file mode 100644
index 0000000000..e854903f0a
--- /dev/null
+++ b/examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:7bbd3c8c318a17a99b2f873fb09f1195e60c1337e95a003192b780acada0754c
+size 960569
diff --git a/examples/data/sid_visualization/group1-benign-2nodes.jsonlines b/examples/data/sid_visualization/group1-benign-2nodes.jsonlines
new file mode 100644
index 0000000000..e854903f0a
--- /dev/null
+++ b/examples/data/sid_visualization/group1-benign-2nodes.jsonlines
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:7bbd3c8c318a17a99b2f873fb09f1195e60c1337e95a003192b780acada0754c
+size 960569
diff --git a/examples/data/sid_visualization/group2-benign-50nodes.jsonlines b/examples/data/sid_visualization/group2-benign-50nodes.jsonlines
new file mode 100644
index 0000000000..a26aa6c615
--- /dev/null
+++ b/examples/data/sid_visualization/group2-benign-50nodes.jsonlines
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:5cb0b68592646a63a9687acacdc5a1a56ebd9a2f91041be477b23ad6e7ca8a24
+size 45001515
diff --git a/examples/data/sid_visualization/group3-si-50nodes.jsonlines b/examples/data/sid_visualization/group3-si-50nodes.jsonlines
new file mode 100644
index 0000000000..ecd6232f6f
--- /dev/null
+++ b/examples/data/sid_visualization/group3-si-50nodes.jsonlines
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:663cbe7c7f5c7af9d988cdad4b0ca7a1f6d70c25bd06106c30fc5332a320c883
+size 45466635
diff --git a/examples/data/sid_visualization/group4-benign-49nodes.jsonlines b/examples/data/sid_visualization/group4-benign-49nodes.jsonlines
new file mode 100644
index 0000000000..c8bc8edadc
--- /dev/null
+++ b/examples/data/sid_visualization/group4-benign-49nodes.jsonlines
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:5371af3baf386f3d58672dca57dc179db6f8a9478e7e0b0dca5e3c0e8b1584c3
+size 82345519
diff --git a/examples/sid_visualization/README.md b/examples/sid_visualization/README.md
new file mode 100644
index 0000000000..9b5a1d3b14
--- /dev/null
+++ b/examples/sid_visualization/README.md
@@ -0,0 +1,153 @@
+# SID Visualization Example
+
+## Prerequisites
+
+To run the demo you will need the following:
+- Docker
+- `docker-compose` (Tested with version 1.29)
+
+## Setup
+
+To run this demo, ensure all submodules are checked out:
+```bash
+git submodule update --init --recursive
+```
+
+### Build Morpheus Dev Container
+
+Before launching the demo, we need the dev container for Morpheus to be created:
+```bash
+export DOCKER_IMAGE_TAG="sid-viz"
+```
+
+# Build the dev container
+```bash
+./docker/build_container_dev.sh
+```
+
+### Launch User Interface
+
+We will use docker-compose to build and run the entire demo. To launch everything, run the following from the repo root:
+
+Save the Morpheus repo directory:
+```bash
+export MORPHEUS_HOME=$(git rev-parse --show-toplevel)
+```
+
+Ensure SID model is downloaded for deployment to Triton:
+```bash
+./scripts/fetch_data.py fetch models
+```
+
+Change to the example directory:
+```bash
+cd ${MORPHEUS_HOME}/examples/sid_visualization
+```
+
+Launch the containers:
+```bash
+DOCKER_BUILDKIT=1 docker-compose up --build -d
+```
+
+The following GUI should be displayed when all containers have completed launching:
+
+
+
+### Build Morpheus
+
+Once docker-compose command has completed and GUI is displayed, exec into the container to build and run Morpheus:
+
+Exec into the morpheus container:
+```bash
+docker-compose exec morpheus bash
+```
+
+Inside the container, compile morpheus:
+```bash
+BUILD_DIR=build-docker ./scripts/compile.sh
+```
+
+Install morpheus with an extra dependency:
+```bash
+pip install -e . && pip install websockets
+```
+
+Verify Morpheus is installed:
+```bash
+morpheus --version
+```
+
+Ensure the data has been downloaded:
+```bash
+./scripts/fetch_data.py fetch examples
+```
+
+***Keep this shell in the Morpheus Dev container running. It will be used later to start Morpheus.***
+
+## Running the Demo
+
+### Running Morpheus
+
+After the GUI has been launched, Morpheus now needs to be started. In the same shell used to build Morpheus (the one running the Morpheus Dev container), run the following:
+```bash
+python examples/sid_visualization/run.py \
+ --debug --use_cpp=False --num_threads=1 \
+ --triton_server_url=triton:8001 \
+ --input_file=./examples/data/sid_visualization/group1-benign-2nodes.jsonlines \
+ --input_file=./examples/data/sid_visualization/group2-benign-50nodes.jsonlines \
+ --input_file=./examples/data/sid_visualization/group3-si-50nodes.jsonlines \
+ --input_file=./examples/data/sid_visualization/group4-benign-49nodes.jsonlines
+```
+
+**Note:** The first run of this script will take a few minutes to allow Triton to convert the deployed ONNX model to TensorRT. Subsequent runs will not include this conversion step so will be much quicker.
+
+This launch will use all of the available datasets. Each dataset will show up as one batch in the visualization. Here is a description of each dataset:
+
+- `examples/data/sid_visualization/group1-benign-2nodes.jsonlines`
+ - Small scale with 2 nodes, no SID
+- `examples/data/sid_visualization/group2-benign-50nodes.jsonlines`
+ - Scale up to 50 nodes, no SID
+- `examples/data/sid_visualization/group3-si-50nodes.jsonlines`
+ - 50 nodes, with SID from a single node
+- `examples/data/sid_visualization/group4-benign-49nodes.jsonlines`
+ - Isolate bad node leaving 49 nodes, no SID
+
+The following is a screenshot after all four batches have been processed:
+
+
+
+Use the slider or the following buttons to step through the inferences batches in the visualization:
+| | |
+| ---------------------------- | ------------------------------------------------- |
+| | Step to previous inference batch |
+| | Step to next inference batch |
+| | Step through all inference batches from beginning |
+| | Pause animation |
+
+The visualization on the right shows nodes in the current inference batch represented as
+green spheres. White (benign) and red (SI) packets are shown flowing between the node connections.
+While the animation is running, you can click the pause button or toggle off `Simulating`. Once paused,
+you will be able to hover over an individual packet to view its contents.
+
+Changing the dataset does not require relaunching the GUI. Simply re-run Morpheus with the new dataset and the GUI will be updated.
+
+It's also possible to launch the demo using the Morpheus CLI using the following:
+
+```bash
+DEMO_DATASET="examples/data/sid_visualization/group1-benign-2nodes.jsonlines"
+```
+
+```bash
+morpheus --log_level=DEBUG \
+ run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --edge_buffer_size=4 --use_cpp=False \
+ pipeline-nlp --model_seq_length=256 \
+ from-file --filename=${DEMO_DATASET} \
+ deserialize \
+ preprocess --vocab_hash_file=morpheus/data/bert-base-uncased-hash.txt --truncation=True --do_lower_case=True --add_special_tokens=False \
+ inf-triton --model_name=sid-minibert-onnx --server_url=triton:8001 --force_convert_inputs=True \
+ monitor --description Inference\ Rate --unit=inf \
+ add-class \
+ gen-viz
+```
+
+Note, this launch method is more useful for showing performance than showing capability.
diff --git a/examples/sid_visualization/docker-compose.yml b/examples/sid_visualization/docker-compose.yml
new file mode 100644
index 0000000000..f123f0d867
--- /dev/null
+++ b/examples/sid_visualization/docker-compose.yml
@@ -0,0 +1,93 @@
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+x-with-gpus: &with_gpus
+ deploy:
+ resources:
+ reservations:
+ devices:
+ - capabilities:
+ - gpu
+
+services:
+ triton:
+ image: nvcr.io/nvidia/tritonserver:22.06-py3
+ <<: *with_gpus
+ command: "tritonserver --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx --model-repository=/models/triton-model-repo"
+ environment:
+ NVIDIA_VISIBLE_DEVICES: "${NVIDIA_VISIBLE_DEVICES:-all}"
+ ports:
+ - "8000"
+ - "8001"
+ - "8002"
+ runtime: nvidia
+ volumes:
+ - "${MORPHEUS_HOME:-../..}/models:/models"
+
+ gui:
+ image: sid-viz:latest
+ <<: *with_gpus
+ build:
+ context: viz
+ args:
+ RAPIDSAI_GPU_ARCH: "${RAPIDSAI_GPU_ARCH:-}" # 60 | 70 | 75 | 80 | 86
+ cap_add:
+ - SYS_ADMIN
+ - SYS_PTRACE
+ security_opt:
+ - apparmor=unconfined
+ environment:
+ NVIDIA_DRIVER_CAPABILITIES: all
+ MORPHEUS_SOCKET_URL: "morpheus:8765"
+ # Colorize the terminal in the container if possible
+ TERM: "${TERM:-}"
+ # Use the host's X11 display
+ DISPLAY: "${DISPLAY:-}"
+ XAUTHORITY: "${XAUTHORITY:-}"
+ XDG_SESSION_TYPE: "${XDG_SESSION_TYPE:-}"
+ XDG_RUNTIME_DIR: "${XDG_RUNTIME_DIR:?XDG_RUNTIME_DIR must be set}"
+ DBUS_SESSION_BUS_ADDRESS: "${DBUS_SESSION_BUS_ADDRESS:?DBUS_SESSION_BUS_ADDRESS must be set}"
+ runtime: nvidia
+ volumes:
+ - "/etc/fonts:/etc/fonts:ro"
+ - "/etc/timezone:/etc/timezone:ro"
+ - "/etc/localtime:/etc/localtime:ro"
+ - "/tmp/.X11-unix:/tmp/.X11-unix:rw"
+ - "/usr/share/fonts:/usr/share/fonts:ro"
+ - "/usr/share/icons:/usr/share/icons:ro"
+ - "${XDG_RUNTIME_DIR}:${XDG_RUNTIME_DIR}"
+ - "/run/dbus/system_bus_socket:/run/dbus/system_bus_socket"
+
+ morpheus:
+ image: morpheus:sid-viz
+ <<: *with_gpus
+ command: bash
+ cap_add:
+ - SYS_NICE
+ depends_on:
+ - gui
+ - triton
+ environment:
+ BUILD_DIR: build-docker # Avoid conflicting with the host default build
+ NVIDIA_VISIBLE_DEVICES: "${NVIDIA_VISIBLE_DEVICES:-all}"
+ ports:
+ - "8765"
+ stdin_open: true
+ tty: true
+ runtime: nvidia
+ volumes:
+ - "${MORPHEUS_HOME:-../../}:/workspace"
diff --git a/examples/sid_visualization/img/full_win.png b/examples/sid_visualization/img/full_win.png
new file mode 100644
index 0000000000..54486c1e46
Binary files /dev/null and b/examples/sid_visualization/img/full_win.png differ
diff --git a/examples/sid_visualization/img/initial_win.png b/examples/sid_visualization/img/initial_win.png
new file mode 100644
index 0000000000..7a18a61397
Binary files /dev/null and b/examples/sid_visualization/img/initial_win.png differ
diff --git a/examples/sid_visualization/img/left.png b/examples/sid_visualization/img/left.png
new file mode 100644
index 0000000000..83fcee62a2
Binary files /dev/null and b/examples/sid_visualization/img/left.png differ
diff --git a/examples/sid_visualization/img/pause.png b/examples/sid_visualization/img/pause.png
new file mode 100644
index 0000000000..85ef8af8c4
Binary files /dev/null and b/examples/sid_visualization/img/pause.png differ
diff --git a/examples/sid_visualization/img/replay.png b/examples/sid_visualization/img/replay.png
new file mode 100644
index 0000000000..7997996693
Binary files /dev/null and b/examples/sid_visualization/img/replay.png differ
diff --git a/examples/sid_visualization/img/right.png b/examples/sid_visualization/img/right.png
new file mode 100644
index 0000000000..b6982549e8
Binary files /dev/null and b/examples/sid_visualization/img/right.png differ
diff --git a/examples/sid_visualization/run.py b/examples/sid_visualization/run.py
new file mode 100644
index 0000000000..83a1726aa5
--- /dev/null
+++ b/examples/sid_visualization/run.py
@@ -0,0 +1,222 @@
+# Copyright (c) 2022, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import typing
+
+import click
+import srf
+
+from morpheus._lib.file_types import FileTypes
+from morpheus._lib.messages import MessageMeta
+from morpheus.config import Config
+from morpheus.config import CppConfig
+from morpheus.config import PipelineModes
+from morpheus.io.deserializers import read_file_to_df
+from morpheus.pipeline.linear_pipeline import LinearPipeline
+from morpheus.pipeline.single_output_source import SingleOutputSource
+from morpheus.pipeline.stream_pair import StreamPair
+from morpheus.stages.general.monitor_stage import MonitorStage
+from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
+from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
+from morpheus.stages.postprocess.generate_viz_frames_stage import GenerateVizFramesStage
+from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
+from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
+from morpheus.utils.file_utils import get_data_file_path
+from morpheus.utils.file_utils import load_labels_file
+from morpheus.utils.logger import configure_logging
+
+
+class NLPVizFileSource(SingleOutputSource):
+ """
+ Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for
+ testing performance and accuracy of a pipeline.
+
+ Parameters
+ ----------
+ c : `morpheus.config.Config`
+ Pipeline configuration instance.
+ filename : str
+ Name of the file from which the messages will be read.
+ iterative: boolean
+ Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. Iterative mode is
+ good for interleaving source stages.
+ file_type : `morpheus._lib.file_types.FileTypes`, default = 'auto'
+ Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension.
+ Supported extensions: 'json', 'csv'
+ repeat: int, default = 1
+ Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
+ filter_null: bool, default = True
+ Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down
+ the line with processing. Setting this to True is recommended.
+ cudf_kwargs: dict, default=None
+ keyword args passed to underlying cuDF I/O function. See the cuDF documentation for `cudf.read_csv()` and
+ `cudf.read_json()` for the available options. With `file_type` == 'json', this defaults to ``{ "lines": True }``
+ and with `file_type` == 'csv', this defaults to ``{}``.
+ """
+
+ def __init__(self,
+ c: Config,
+ filenames: typing.List[str],
+ file_type: FileTypes = FileTypes.Auto,
+ cudf_kwargs: dict = None):
+
+ super().__init__(c)
+
+ self._batch_size = c.pipeline_batch_size
+
+ self._filenames = filenames
+ self._file_type = file_type
+ self._cudf_kwargs = {} if cudf_kwargs is None else cudf_kwargs
+
+ self._input_count = None
+ self._max_concurrent = c.num_threads
+
+ @property
+ def name(self) -> str:
+ return "from-multi-file"
+
+ @property
+ def input_count(self) -> int:
+ """Return None for no max intput count"""
+ return self._input_count
+
+ def supports_cpp_node(self):
+ return False
+
+ def _build_source(self, builder: srf.Builder) -> StreamPair:
+
+ if self._build_cpp_node():
+ raise RuntimeError("Does not support C++ nodes")
+ else:
+ out_stream = builder.make_source(self.unique_name, self._generate_frames())
+
+ out_type = MessageMeta
+
+ return out_stream, out_type
+
+ def _generate_frames(self):
+
+ for f in self._filenames:
+
+ # Read the dataframe into memory
+ df = read_file_to_df(
+ f,
+ self._file_type,
+ filter_nulls=True,
+ df_type="cudf",
+ )
+
+ # Truncate it down to the max size
+ df = df.head(self._batch_size)
+
+ x = MessageMeta(df)
+
+ yield x
+
+
+@click.command()
+@click.option("--debug/--no-debug", default=False)
+@click.option('--use_cpp', default=False)
+@click.option(
+ "--num_threads",
+ default=os.cpu_count(),
+ type=click.IntRange(min=1),
+ help="Number of internal pipeline threads to use",
+)
+@click.option(
+ "--input_file",
+ "-f",
+ type=click.Path(exists=True, dir_okay=False),
+ multiple=True,
+ required=True,
+ default=[
+ "examples/data/sid_visualization/group1-benign-2nodes-v2.jsonlines",
+ "examples/data/sid_visualization/group2-benign-50nodes.jsonlines"
+ ],
+ help="List of files to send to the visualization, in order",
+)
+@click.option('--max_batch_size',
+ default=50000,
+ type=click.IntRange(min=1),
+ help=("For each input_file, truncate the number of rows to this size."))
+@click.option(
+ "--model_name",
+ default="sid-minibert-onnx",
+ help="The name of the model that is deployed on Tritonserver",
+)
+@click.option("--triton_server_url", default="localhost:8001", required=True, help="Tritonserver url")
+def run_pipeline(debug, use_cpp, num_threads, input_file, max_batch_size, model_name, triton_server_url):
+
+ if debug:
+ configure_logging(log_level=logging.DEBUG)
+ else:
+ configure_logging(log_level=logging.INFO)
+
+ CppConfig.set_should_use_cpp(use_cpp)
+
+ # Its necessary to get the global config object and configure it for FIL mode
+ config = Config()
+ config.mode = PipelineModes.NLP
+
+ # Below properties are specified by the command line
+ config.num_threads = num_threads
+ config.pipeline_batch_size = max_batch_size
+ config.feature_length = 256
+ config.class_labels = load_labels_file(get_data_file_path("data/labels_nlp.txt"))
+
+ # Create a linear pipeline object
+ pipeline = LinearPipeline(config)
+
+ # Set source stage
+ # This stage reads raw data from the required plugins and merge all the plugins data into a single dataframe
+ # for a given source.
+ pipeline.set_source(NLPVizFileSource(config, filenames=input_file))
+
+ pipeline.add_stage(DeserializeStage(config))
+
+ pipeline.add_stage(
+ PreprocessNLPStage(config,
+ vocab_hash_file=get_data_file_path("data/bert-base-uncased-hash.txt"),
+ truncation=True,
+ do_lower_case=True,
+ add_special_tokens=False))
+
+ # Add a inference stage
+ pipeline.add_stage(
+ TritonInferenceStage(
+ config,
+ model_name=model_name,
+ server_url=triton_server_url,
+ force_convert_inputs=True,
+ ))
+
+ # Add a monitor stage
+ pipeline.add_stage(MonitorStage(config, description="Inference rate"))
+
+ pipeline.add_stage(AddClassificationsStage(config, threshold=0.8))
+
+ pipeline.add_stage(GenerateVizFramesStage(config, server_url="0.0.0.0", server_port=8765))
+
+ # Build pipeline
+ pipeline.build()
+
+ # Run the pipeline
+ pipeline.run()
+
+
+# Execution starts here
+if __name__ == "__main__":
+ run_pipeline()
diff --git a/examples/sid_visualization/viz b/examples/sid_visualization/viz
new file mode 160000
index 0000000000..a7ca78106c
--- /dev/null
+++ b/examples/sid_visualization/viz
@@ -0,0 +1 @@
+Subproject commit a7ca78106c5439e4e0f8687ea297b44646317311
diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py
index 300b7812ea..d7bd8fd4be 100644
--- a/morpheus/pipeline/pipeline.py
+++ b/morpheus/pipeline/pipeline.py
@@ -226,7 +226,7 @@ async def join(self):
for s in list(self._stages):
await s.join()
- def build_and_start(self):
+ async def build_and_start(self):
if (not self.is_built):
try:
@@ -235,8 +235,16 @@ def build_and_start(self):
logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True)
return
+ await self.async_start()
+
self.start()
+ async def async_start(self):
+
+ # Loop over all stages and call on_start if it exists
+ for s in self._stages:
+ await s.start_async()
+
def _on_start(self):
# Only execute this once
@@ -417,7 +425,7 @@ def term_signal():
loop.add_signal_handler(s, term_signal)
try:
- self.build_and_start()
+ await self.build_and_start()
# Wait for completion
await self.join()
diff --git a/morpheus/pipeline/stage.py b/morpheus/pipeline/stage.py
index d32309d2ca..a6d4ca52df 100644
--- a/morpheus/pipeline/stage.py
+++ b/morpheus/pipeline/stage.py
@@ -52,6 +52,13 @@ def on_start(self):
"""
pass
+ async def start_async(self):
+ """
+ This function is called along with on_start during stage initialization. Allows stages to utilize the
+ asyncio loop if needed.
+ """
+ pass
+
def _on_complete(self, stream):
logger.info("Stage Complete: {}".format(self.name))
diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py
index 2fd2b9e3e3..8289c023cc 100644
--- a/morpheus/stages/postprocess/generate_viz_frames_stage.py
+++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py
@@ -12,15 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
import json
+import logging
import os
-import shutil
+import sys
import typing
-import warnings
import numpy as np
import pandas as pd
+import pyarrow as pa
import srf
+import srf.core.operators as ops
+import websockets.legacy.server
+from websockets.server import serve
+
+import cudf
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
@@ -28,6 +35,10 @@
from morpheus.messages import MultiResponseProbsMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
+from morpheus.utils.producer_consumer_queue import AsyncIOProducerConsumerQueue
+from morpheus.utils.producer_consumer_queue import Closed
+
+logger = logging.getLogger(__name__)
@register_stage("gen-viz", modes=[PipelineModes.NLP], command_args={"deprecated": True})
@@ -37,32 +48,26 @@ class GenerateVizFramesStage(SinglePortStage):
Parameters
----------
- c : `morpheus.config.Config`
- Pipeline configuration instance.
+ c : morpheus.config.Config
+ Pipeline configuration instance
out_dir : str
- Output directory to write visualization frames.
+ Output directory to write visualization frames
overwrite : bool
- Overwrite file if exists.
+ Overwrite file if exists
"""
- def __init__(self, c: Config, out_dir: str = "./viz_frames", overwrite: bool = False):
+ def __init__(self, c: Config, server_url: str = "0.0.0.0", server_port: int = 8765):
super().__init__(c)
- self._out_dir = out_dir
- self._overwrite = overwrite
-
- if (os.path.exists(self._out_dir)):
- if (self._overwrite):
- shutil.rmtree(self._out_dir)
- elif (len(list(os.listdir(self._out_dir))) > 0):
- warnings.warn(("Viz output directory '{}' already exists. "
- "Errors will occur if frames try to be written over existing files. "
- "Suggest emptying the directory or setting `overwrite=True`").format(self._out_dir))
-
- os.makedirs(self._out_dir, exist_ok=True)
+ self._server_url = server_url
+ self._server_port = server_port
self._first_timestamp = -1
+ self._buffers = []
+ self._buffer_queue: AsyncIOProducerConsumerQueue = None
+
+ self._replay_buffer = []
@property
def name(self) -> str:
@@ -74,8 +79,8 @@ def accepted_types(self) -> typing.Tuple:
Returns
-------
- typing.Tuple[`morpheus.pipeline.messages.MultiResponseProbsMessage`, ]
- Accepted input types.
+ typing.Tuple[morpheus.pipeline.messages.MultiResponseProbsMessage, ]
+ Accepted input types
"""
return (MultiResponseProbsMessage, )
@@ -86,17 +91,17 @@ def supports_cpp_node(self):
@staticmethod
def round_to_sec(x):
"""
- Round to even seconds second.
+ Round to even seconds second
Parameters
----------
x : int/float
- Rounding up the value.
+ Rounding up the value
Returns
-------
int
- Value rounded up.
+ Value rounded up
"""
return int(round(x / 1000.0) * 1000)
@@ -148,8 +153,6 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):
in_df = pd.concat([df for _, df in x], ignore_index=True).sort_values(by=["timestamp"])
- # curr_timestamp = GenerateVizFramesStage.round_to_sec(in_df["timestamp"].iloc[0])
-
if (self._first_timestamp == -1):
self._first_timestamp = curr_timestamp
@@ -161,21 +164,115 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):
in_df.to_csv(fn, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"])
- def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
+ async def start_async(self):
+
+ loop = asyncio.get_event_loop()
+ self._loop = loop
+
+ self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2, loop=loop)
+
+ async def client_connected(websocket: websockets.legacy.server.WebSocketServerProtocol):
+
+ logger.info("Got connection from: {}:{}".format(*websocket.remote_address))
+
+ while True:
+ try:
+ next_buffer = await self._buffer_queue.get()
+ await websocket.send(next_buffer.to_pybytes())
+ except Closed:
+ break
+ except Exception as ex:
+ logger.exception("Error occurred trying to send message over socket", exc_info=ex)
+
+ logger.info("Disconnected from: {}:{}".format(*websocket.remote_address))
+
+ async def run_server():
+
+ try:
+
+ async with serve(client_connected, self._server_url, self._server_port) as server:
+
+ listening_on = [":".join([str(y) for y in x.getsockname()]) for x in server.sockets]
+ listening_on_str = [f"'{x}'" for x in listening_on]
+
+ logger.info("Websocket server listening at: {}".format(", ".join(listening_on_str)))
+
+ await self._server_close_event.wait()
+
+ logger.info("Server shut down")
+
+ logger.info("Server shut down. Is queue empty: {}".format(self._buffer_queue.empty()))
+ except Exception as e:
+ logger.error("Error during serve", exc_info=e)
+ raise
+
+ self._server_task = loop.create_task(run_server())
+
+ self._server_close_event = asyncio.Event(loop=loop)
+
+ await asyncio.sleep(1.0)
+
+ return await super().start_async()
+
+ async def _stop_server(self):
+
+ logger.info("Shutting down queue")
+
+ await self._buffer_queue.close()
+
+ self._server_close_event.set()
+
+ # Wait for it to
+ await self._server_task
+
+ def _build_single(self, seg: srf.Builder, input_stream: StreamPair) -> StreamPair:
stream = input_stream[0]
- # Convert stream to dataframes
- stream = stream.map(self._to_vis_df) # Convert group to dataframe
+ def node_fn(input, output):
+
+ def write_batch(x: MultiResponseProbsMessage):
+
+ sink = pa.BufferOutputStream()
+
+ # This is the timestamp of the earliest message
+ t0 = x.get_meta("timestamp").min()
+
+ df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"])
+
+ out_df = cudf.DataFrame()
+
+ out_df["dt"] = (df["timestamp"] - t0).astype(np.int32)
+ out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32)
+ out_df["dst"] = df["dest_ip"].str.ip_to_int().astype(np.int32)
+ out_df["lvl"] = df["secret_keys"].astype(np.int32)
+ out_df["data"] = df["data"]
+
+ array_table = out_df.to_arrow()
+
+ with pa.ipc.new_stream(sink, array_table.schema) as writer:
+ writer.write(array_table)
+
+ out_buf = sink.getvalue()
+
+ # Enqueue the buffer and block until that completes
+ asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()
+
+ input.pipe(ops.map(write_batch)).subscribe(output)
+
+ logger.info("Gen-viz stage completed. Waiting for shutdown")
+
+ shutdown_future = asyncio.run_coroutine_threadsafe(self._stop_server(), loop=self._loop)
- # Flatten the list of tuples
- stream = stream.flatten()
+ # Wait for shutdown. Unless we have a debugger attached
+ shutdown_future.result(timeout=2.0 if sys.gettrace() is None else None)
- # Partition by group times
- stream = stream.partition(10000, timeout=10, key=lambda x: x[0]) # Group
- # stream = stream.filter(lambda x: len(x) > 0)
+ logger.info("Gen-viz shutdown complete")
- stream.sink(self._write_viz_file)
+ # Sink to file
+ to_file = seg.make_node_full(self.unique_name, node_fn)
+ seg.make_edge(stream, to_file)
+ stream = to_file
- # Return input unchanged
+ # Return input unchanged to allow passthrough
return input_stream
diff --git a/morpheus/utils/file_utils.py b/morpheus/utils/file_utils.py
new file mode 100644
index 0000000000..c5251527e7
--- /dev/null
+++ b/morpheus/utils/file_utils.py
@@ -0,0 +1,74 @@
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import typing
+
+import morpheus
+
+
+def get_data_file_path(data_filename: str) -> str:
+ """
+ Get data file path. Also handles paths relative to Morpheus root.
+
+ Parameters
+ ----------
+ data_filename : str
+ Absolute or relative path of data file.
+
+ Returns
+ -------
+ str
+ Data file path.
+ """
+ # First check if the path is relative
+ if (os.path.isabs(data_filename)):
+ # Already absolute, nothing to do
+ return data_filename
+
+ # See if the file exists.
+ does_exist = os.path.exists(data_filename)
+
+ if (not does_exist):
+ # If it doesn't exist, then try to make it relative to the morpheus library root
+ morpheus_root = os.path.dirname(morpheus.__file__)
+
+ value_abs_to_root = os.path.join(morpheus_root, data_filename)
+
+ # If the file relative to our package exists, use that instead
+ if (os.path.exists(value_abs_to_root)):
+
+ return value_abs_to_root
+
+ return data_filename
+
+
+def load_labels_file(labels_filename: str) -> typing.List[str]:
+ """
+ Get list of labels from file.
+
+ Parameters
+ ----------
+ labels_filename : str
+ Labels file path
+
+ Returns
+ -------
+ typing.List[str]
+ List of labels
+ """
+
+ with open(labels_filename, "r") as lf:
+ return [x.strip() for x in lf.readlines()]