From 50afa09e19dba2f8bbde73c94b583cfc06bd05cc Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 27 Oct 2022 09:59:48 -0700 Subject: [PATCH 1/7] wip --- scripts/validation/kafka_testing.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index e210ee825f..cd91eb71d8 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -1,4 +1,8 @@ ## Pre-reqs +1. Ensure that we have enough file descriptors: + ```bash + ulimit -n 1048576 + ``` 1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) 1. Populate an environment variable `BROKER_LIST` with the IP:Ports of the nodes in the Kafka cluster. Ensure this environment variable is set in all of the terminals where Morpheus is executed: ```bash From b39c1359ee91a5639837a89185b0a3016226eb73 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 27 Oct 2022 13:19:45 -0700 Subject: [PATCH 2/7] wip --- scripts/validation/kafka_testing.md | 148 ++++++++++++---------------- 1 file changed, 62 insertions(+), 86 deletions(-) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index cd91eb71d8..d431dc96ee 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -1,30 +1,46 @@ ## Pre-reqs -1. Ensure that we have enough file descriptors: - ```bash - ulimit -n 1048576 - ``` -1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) -1. Populate an environment variable `BROKER_LIST` with the IP:Ports of the nodes in the Kafka cluster. Ensure this environment variable is set in all of the terminals where Morpheus is executed: +1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) following steps 1-6. + +1. The testing steps below will require four seperate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replace these paths with the location of your checkouts. ```bash + export MORPHEUS_ROOT=~/work/morpheus export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') - export BROKER_LIST=$(HOST_IP=$KAFKA_ADVERTISED_HOST_NAME ./broker-list.sh) + export BROKER_LIST=$(HOST_IP=$KAFKA_ADVERTISED_HOST_NAME ~/work/kafka-docker/broker-list.sh) ``` - -## Simple Data Copying -### Checking KafkaSourceStage -#### Single Partition Topic Test -1. Open a new terminal and create a topic called "morpheus-src-copy-test" with only a single partition +1. Open two new terminals and start the Kafka docker container in each: ```bash docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash ``` + + Leave these terminals open the testing steps will refer to these as the "first Kafka terminal" and "second Kafka terminal", all commands executed from these terminals will be within the kafka container. + +1. Open two new terminals and navigate to the root of the Morpheus repo. The first terminal will be referred to as the "Morpheus terminal" and will be used for running Morpheus pipelines and verifying output. The second terminal will be referred to as the "Triton terminal" used for launching Triton. + +### File descriptors +If you receive errors from Kafka such as `Too many open files`, you may need to increase the maximum number of open file descriptors. To check the current file descriptor limit run: +```bash +ulimit -n +``` + +To increase the limit (in this example to `4096`): +```bash +ulimit -n 4096 +``` + + + +## Simple Data Copying +### Checking KafkaSourceStage +#### Single Partition Topic Test +1. From the first Kafka terminal, create a topic called "morpheus-src-copy-test" with only a single partition. ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test --partitions 1 --bootstrap-server `broker-list.sh` ``` Keep this shell & container open you will need it in later steps. -1. Open a new terminal and launch a pipeline to listen to Kafka, from the root of the Morpheus repo run: +1. From the Morpheus terminal launch a pipeline to listen to Kafka: ```bash morpheus --log_level=DEBUG run \ pipeline-nlp \ @@ -35,7 +51,7 @@ to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv --overwrite ``` -1. Return to the Kafka terminal and run: +1. Return to the first Kafka terminal and run: ```bash cat /workspace/tests/tests_data/filter_probs.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -50,12 +66,12 @@ ``` #### Partitioned Topic Test -1. From the Kafka terminal create a new topic named "morpheus-src-copy-test-p" with three partitions: +1. From the first Kafka terminal create a new topic named `morpheus-src-copy-test-p` with three partitions: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` ``` -1. Open a new terminal and launch a pipeline to listen to Kafka, from the root of the Morpheus repo run: +1. From the Morpheus terminal run: ```bash morpheus --log_level=DEBUG run \ pipeline-nlp \ @@ -66,7 +82,7 @@ to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test-p.csv --overwrite ``` -1. Return to the Kafka terminal and run: +1. Return to the first Kafka terminal and run: ```bash cat /workspace/tests/tests_data/filter_probs.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -83,12 +99,7 @@ ### Checking WriteToKafkaStage #### Single Partition Topic Test -1. Open a new terminal and create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic: - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the first Kafka terminal create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test --partitions 1 --bootstrap-server `broker-list.sh` @@ -96,7 +107,7 @@ --bootstrap-server `broker-list.sh` > /workspace/.tmp/morpheus-sink-copy-test.jsonlines ``` -1. Open a new terminal and from the Morpheus root run: +1. From the Morpheus terminal run: ```bash morpheus --log_level=DEBUG run \ pipeline-nlp \ @@ -118,10 +129,10 @@ ``` Note the usage of `jq --sort-keys` which will reformat the json outut, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivelant. -1. Stop the consumer in the Kafka terminal. +1. Stop the consumer in the first Kafka terminal. #### Partitioned Topic Test -1. From the Kafka terminal create a new topic named "morpheus-sink-copy-test-p" with three partitions, and start a consumer on that topic: +1. From the first Kafka terminal create a new topic named "morpheus-sink-copy-test-p" with three partitions, and start a consumer on that topic: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` @@ -129,7 +140,7 @@ --bootstrap-server `broker-list.sh` > /workspace/.tmp/morpheus-sink-copy-test-p.jsonlines ``` -1. Open a new terminal and from the Morpheus root run: +1. From the Morpheus terminal run: ```bash morpheus --log_level=DEBUG run \ pipeline-nlp \ @@ -151,18 +162,13 @@ ``` Note due to the multiple partitions the consumer most likely receieved records out of order, so we are comparing the sorted output of both files. -1. Stop the consumer in the Kafka terminal. +1. Stop the consumer in the first Kafka terminal. ## ABP Validation Pipeline For this test we are going to replace the from & to file stages from the ABP validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-abp-pre" and writing results to a topic named "morpheus-abp-post" -1. Create two Kafka topics both with only a single partition, and launch a consumer listening to the morpheus-abp-post topic. - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the first Kafka terminal create two topics both with only a single partition, and launch a consumer listening to the `morpheus-abp-post` topic. ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-abp-pre --partitions 1 --bootstrap-server `broker-list.sh` @@ -172,7 +178,7 @@ For this test we are going to replace the from & to file stages from the ABP val --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines ``` -1. In a new terminal launch Triton: +1. From the Triton terminal run: ```bash docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ nvcr.io/nvidia/tritonserver:22.08-py3 \ @@ -182,7 +188,7 @@ For this test we are going to replace the from & to file stages from the ABP val --load-model abp-nvsmi-xgb ``` -1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: +1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: ```bash morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ pipeline-fil \ @@ -198,13 +204,7 @@ For this test we are going to replace the from & to file stages from the ABP val monitor --description "Kafka Write" ``` -1. Open a new terminal and launch a Kafka producer to feed the morpheus-abp-pre topic with the input data: - ```bash - export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the second Kafka terminal launch a producer to feed the `morpheus-abp-pre` topic with the input data: ```bash cat /workspace/models/datasets/validation-data/abp-validation-data.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -222,18 +222,14 @@ For this test we are going to replace the from & to file stages from the ABP val diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/models/datasets/validation-data/abp-validation-data.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines | jq --sort-keys) ``` -1. Stop the consumer in the first Kafka terminal, and stop Triton. +1. Return to the first Kafka terminal and stop the consumer. +1. Return to the Triton Terminal and stop Triton. ## DFP (Hammah) Validation Pipeline ### User123 For this test we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-user123". Note: this pipeline requires a custom `UserMessageMeta` class which the from-kafka stage is currently unable to generate, for that reason the `CloudTrailSourceStage` remains in-place. -1. Create the Kafka topic, and launch a consumer listening to . - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the first Kafka terminal create the `morpheus-hammah-user123` topic, and launch a consumer listening to it: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-user123 --partitions 1 --bootstrap-server `broker-list.sh` @@ -241,7 +237,7 @@ For this test we are going to replace to-file stage from the Hammah validation p --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-user123-pytorch.jsonlines ``` -1. Open a new terminal and launch the pipeline which will write results to kafka: +1. From the Morpheus terminal launch the pipeline which will write results to kafka: ```bash morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ pipeline-ae --userid_filter="user123" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ @@ -264,7 +260,7 @@ For this test we are going to replace to-file stage from the Hammah validation p wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines ``` -1. Once all `847` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `847` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. 1. Verify the output with, expect to see `38` unmatched rows: ```bash @@ -277,12 +273,7 @@ For this test we are going to replace to-file stage from the Hammah validation p ### Role-g Similar to the Hammah User123 test, we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-role-g". -1. Create the Kafka topic, and launch a consumer listening to . - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the first Kafka terminal create the `morpheus-hammah-role-g` topic, and launch a consumer listening to it: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-role-g --partitions 1 --bootstrap-server `broker-list.sh` @@ -290,7 +281,7 @@ Similar to the Hammah User123 test, we are going to replace to-file stage from t --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines ``` -1. Open a new terminal and launch the pipeline which will write results to kafka: +1. From the Morpheus terminal launch the pipeline which will write results to kafka: ```bash morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ pipeline-ae --userid_filter="role-g" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ @@ -313,7 +304,7 @@ Similar to the Hammah User123 test, we are going to replace to-file stage from t wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines ``` -1. Once all `314` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `314` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. 1. Verify the output with, all rows should match: ```bash @@ -326,12 +317,7 @@ Similar to the Hammah User123 test, we are going to replace to-file stage from t ## Phishing Validation Pipeline For this test we are going to replace the from & to file stages from the Phishing validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-phishing-pre" and writing results to a topic named "morpheus-phishing-post" -1. Create two Kafka topics both with only a single partition, and launch a consumer listening to the morpheus-phishing-post topic. - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the first Kafka terminal create the two topics both with only a single partition, and launch a consumer listening to the `morpheus-phishing-post` topic. ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-pre --partitions 1 --bootstrap-server `broker-list.sh` $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-post --partitions 1 --bootstrap-server `broker-list.sh` @@ -339,7 +325,7 @@ For this test we are going to replace the from & to file stages from the Phishin --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_phishing.jsonlines ``` -1. In a new terminal launch Triton: +1. From the Triton terminal launch Triton with: ```bash docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ nvcr.io/nvidia/tritonserver:22.08-py3 \ @@ -349,7 +335,7 @@ For this test we are going to replace the from & to file stages from the Phishin --load-model phishing-bert-onnx ``` -1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: +1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: ```bash morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ pipeline-nlp --model_seq_length=128 --labels_file=${MORPHEUS_ROOT}/morpheus/data/labels_phishing.txt \ @@ -366,13 +352,7 @@ For this test we are going to replace the from & to file stages from the Phishin monitor --description "Kafka Write" ``` -1. Open a new terminal and launch a Kafka producer to feed the morpheus-phishing-pre topic with the input data: - ```bash - export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the second Kafka terminal launch a Kafka producer to feed the `morpheus-phishing-pre` topic with the input data: ```bash cat /workspace/models/datasets/validation-data/phishing-email-validation-data.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -385,7 +365,7 @@ For this test we are going to replace the from & to file stages from the Phishin wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines ``` -1. Once all `1010` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `1010` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. 1. Verify the output with, expect to see `43` un-matched rows: ```bash @@ -400,19 +380,14 @@ For this test we are going to replace the from & to file stages from the Phishin For this test we are going to replace the file stage from the Sid validation pipeline with the to-kafka stage writing results to a topic named "morpheus-sid-post". Note: Due to the complexity of the input data and a limitation of the cudf reader we will need to keep the from-file source reading data as CSV. -1. Create two Kafka topic and launch a consumer listening to the morpheus-sid-post topic. - ```bash - docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ - -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ - -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash - ``` +1. From the Kafka terminal create a topic named `morpheus-sid-post` and launch a consumer listening to the topic. ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sid-post --partitions 1 --bootstrap-server `broker-list.sh` $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sid-post \ --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_sid.jsonlines ``` -1. In a new terminal launch Triton: +1. From the Triton terminal launch Triton: ```bash docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ nvcr.io/nvidia/tritonserver:22.08-py3 \ @@ -422,7 +397,7 @@ Note: Due to the complexity of the input data and a limitation of the cudf reade --load-model sid-minibert-onnx ``` -1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: +1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: ```bash morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ pipeline-nlp --model_seq_length=256 \ @@ -443,7 +418,7 @@ Note: Due to the complexity of the input data and a limitation of the cudf reade wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines ``` -1. Once all `2000` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `2000` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. 1. Verify the output with, expect to see `25` un-matched rows: ```bash @@ -456,12 +431,13 @@ Note: Due to the complexity of the input data and a limitation of the cudf reade ## Optional Cleanup ### Delete all topics -1. Return to the Kafka terminal and run: +1. Return to the first Kafka terminal and within the container run: ```bash $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh` | xargs -I'{}' $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server `broker-list.sh` --topic='{}' ``` ### Shutdown Kafka +1. Exit from both Kafka terminals. 1. From the root of the `kafka-docker` repo run (in the host OS not inside a container): ```bash docker-compose stop From d02baa33ab92617ac340caca5c61b83932662f66 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 27 Oct 2022 14:09:27 -0700 Subject: [PATCH 3/7] Add note about automated testing, and fix spelling errors --- scripts/validation/kafka_testing.md | 46 +++++++++++++++++++---------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index d431dc96ee..02c8e7f703 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -1,7 +1,22 @@ +This document walks through manual testing of the Kafka functionality in Morpheus. There are also several automated tests which are run as part of the CI process. To run the tests locally first install `pytest-kafka` with: +```bash +mamba install -c conda-forge "openjdk=11.0.15" +mkdir -p ${MORPHEUS_ROOT}/.cache +git clone https://gitlab.com/karolinepauls/pytest-kafka.git ${MORPHEUS_ROOT}/.cache/pytest-kafka +cd ${MORPHEUS_ROOT}/.cache/pytest-kafka +python setup.py develop +cd ${MORPHEUS_ROOT} +``` + +Then run the Kafka tests with: +```bash +pytest --run_slow --run_kafka +``` + ## Pre-reqs 1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) following steps 1-6. -1. The testing steps below will require four seperate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replace these paths with the location of your checkouts. +1. The testing steps below will require four separate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replacing these paths with the location of your checkouts. ```bash export MORPHEUS_ROOT=~/work/morpheus export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') @@ -38,7 +53,6 @@ ulimit -n 4096 ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test --partitions 1 --bootstrap-server `broker-list.sh` ``` - Keep this shell & container open you will need it in later steps. 1. From the Morpheus terminal launch a pipeline to listen to Kafka: ```bash @@ -58,9 +72,9 @@ ulimit -n 4096 --topic=morpheus-src-copy-test --broker-list=`broker-list.sh` - ``` -1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C. +1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Ctrl-C. -1. If successful the output file `.tmp/morpheus-src-copy-test.csv` should be identicle to `tests/tests_data/filter_probs.csv`. Verify: +1. If successful the output file `.tmp/morpheus-src-copy-test.csv` should be identical to `tests/tests_data/filter_probs.csv`. Verify: ```bash diff -q --ignore-all-space ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv ${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv ``` @@ -89,7 +103,7 @@ ulimit -n 4096 --topic=morpheus-src-copy-test-p --broker-list=`broker-list.sh` - ``` -1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C. +1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Ctrl-C. 1. If successful the output file `.tmp/morpheus-src-copy-test-p.csv` should contain the same records as those in `tests/tests_data/filter_probs.csv` however they are most likely out of order. To verify the output we will compare the sorted outputs: ```bash @@ -127,7 +141,7 @@ ulimit -n 4096 ```bash diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.jsonlines | jq --sort-keys) ``` - Note the usage of `jq --sort-keys` which will reformat the json outut, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivelant. + Note the usage of `jq --sort-keys` which will reformat the json output, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivalent. 1. Stop the consumer in the first Kafka terminal. @@ -160,7 +174,7 @@ ulimit -n 4096 ```bash diff -q --ignore-all-space <(sort ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test-p.jsonlines | jq --sort-keys) <(sort ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.jsonlines | jq --sort-keys) ``` - Note due to the multiple partitions the consumer most likely receieved records out of order, so we are comparing the sorted output of both files. + Note due to the multiple partitions the consumer most likely received records out of order, so we are comparing the sorted output of both files. 1. Stop the consumer in the first Kafka terminal. @@ -212,7 +226,7 @@ For this test we are going to replace the from & to file stages from the ABP val ``` This command should execute quickly writing `1242` records and should complete in less than 5 seconds. -1. Return to the Morpheus terminal. Once the `Kafka Write` monitor has reported that `1242` messages has been written shutdown Morpheus with Cntrl-C. We can check the number of lines in the outut file: +1. Return to the Morpheus terminal. Once the `Kafka Write` monitor reports that `1242` messages have been written, shutdown Morpheus with Ctrl-C. We can check the number of lines in the output file: ```bash wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines ``` @@ -227,7 +241,7 @@ For this test we are going to replace the from & to file stages from the ABP val ## DFP (Hammah) Validation Pipeline ### User123 -For this test we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-user123". Note: this pipeline requires a custom `UserMessageMeta` class which the from-kafka stage is currently unable to generate, for that reason the `CloudTrailSourceStage` remains in-place. +For this test we are going to replace the `to-file` stage from the Hammah validation pipeline with the `to-kafka` stage using a topic named "morpheus-hammah-user123". Note: this pipeline requires a custom `UserMessageMeta` class which the `from-kafka` stage is currently unable to generate, for that reason the `CloudTrailSourceStage` remains in-place. 1. From the first Kafka terminal create the `morpheus-hammah-user123` topic, and launch a consumer listening to it: ```bash @@ -260,7 +274,7 @@ For this test we are going to replace to-file stage from the Hammah validation p wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines ``` -1. Once all `847` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `847` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. 1. Verify the output with, expect to see `38` unmatched rows: ```bash @@ -271,7 +285,7 @@ For this test we are going to replace to-file stage from the Hammah validation p ``` ### Role-g -Similar to the Hammah User123 test, we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-role-g". +Similar to the Hammah User123 test, we are going to replace the `to-file` stage from the Hammah validation pipeline with the `to-kafka` stage using a topic named "morpheus-hammah-role-g". 1. From the first Kafka terminal create the `morpheus-hammah-role-g` topic, and launch a consumer listening to it: ```bash @@ -304,7 +318,7 @@ Similar to the Hammah User123 test, we are going to replace to-file stage from t wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines ``` -1. Once all `314` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `314` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. 1. Verify the output with, all rows should match: ```bash @@ -360,12 +374,12 @@ For this test we are going to replace the from & to file stages from the Phishin ``` This command should execute quickly writing `1010` records and should complete in less than 5 seconds. -1. Return to the Morpheus terminal. The pipeline will take anywhere from 2 to 5 minutes to complete. Once the `Kafka Write` monitor has reported that `1010` messages has been written shutdown Morpheus with Cntrl-C. We can check the number of lines in the outut file: +1. Return to the Morpheus terminal. The pipeline will take anywhere from 2 to 5 minutes to complete. Once the `Kafka Write` monitor has reported that `1010` messages have been written, shutdown Morpheus with Ctrl-C. We can check the number of lines in the output file: ```bash wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines ``` -1. Once all `1010` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `1010` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. 1. Verify the output with, expect to see `43` un-matched rows: ```bash @@ -413,12 +427,12 @@ Note: Due to the complexity of the input data and a limitation of the cudf reade monitor --description "Kafka Write" ``` -1. The pipeline will take aproximately 2 minutes to complete. We can check the number of lines in the outut file: +1. The pipeline will take approximately 2 minutes to complete. We can check the number of lines in the output file: ```bash wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines ``` -1. Once all `2000` rows have been written, return to the first Kafka terminal and stop the consumer with Cntrl-C. +1. Once all `2000` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. 1. Verify the output with, expect to see `25` un-matched rows: ```bash From fd344e6388b52e0f01a719f03746abedf420915a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 9 Nov 2022 12:32:39 -0800 Subject: [PATCH 4/7] Include camouflage dep to instructions --- scripts/validation/kafka_testing.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index 02c8e7f703..4bd43c3488 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -1,6 +1,7 @@ -This document walks through manual testing of the Kafka functionality in Morpheus. There are also several automated tests which are run as part of the CI process. To run the tests locally first install `pytest-kafka` with: +This document walks through manual testing of the Kafka functionality in Morpheus. There are also several automated tests which are run as part of the CI process. To run the tests locally we will need to install a few dependencies needed for the tests: ```bash mamba install -c conda-forge "openjdk=11.0.15" +npm install -g camouflage-server@0.9 mkdir -p ${MORPHEUS_ROOT}/.cache git clone https://gitlab.com/karolinepauls/pytest-kafka.git ${MORPHEUS_ROOT}/.cache/pytest-kafka cd ${MORPHEUS_ROOT}/.cache/pytest-kafka From 19f841a78faf0f9cd0400f4d62952fc9198ac2fe Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 9 Nov 2022 12:39:19 -0800 Subject: [PATCH 5/7] Create the .tmp dir --- scripts/validation/kafka_testing.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index 4bd43c3488..86b6bddc24 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -15,6 +15,10 @@ pytest --run_slow --run_kafka ``` ## Pre-reqs +1. Create the `${MORPHEUS_ROOT}/.tmp` dir (this dir is already listed in the `.gitignore` file). + ```bash + mkdir -p ${MORPHEUS_ROOT}/.tmp + ``` 1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) following steps 1-6. 1. The testing steps below will require four separate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replacing these paths with the location of your checkouts. From befef1e63d6e8feea02ef560dd406a48c3a47d31 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 9 Nov 2022 13:37:55 -0800 Subject: [PATCH 6/7] Remove end-to-end pipeline instructions, add a note about topics persisting --- scripts/validation/kafka_testing.md | 296 ++-------------------------- 1 file changed, 16 insertions(+), 280 deletions(-) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index 86b6bddc24..a9592a583a 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -21,22 +21,22 @@ pytest --run_slow --run_kafka ``` 1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) following steps 1-6. -1. The testing steps below will require four separate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replacing these paths with the location of your checkouts. +1. The testing steps below will require two separate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replacing these paths with the location of your checkouts. ```bash export MORPHEUS_ROOT=~/work/morpheus export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') export BROKER_LIST=$(HOST_IP=$KAFKA_ADVERTISED_HOST_NAME ~/work/kafka-docker/broker-list.sh) ``` -1. Open two new terminals and start the Kafka docker container in each: +1. Open a new terminala and start the Kafka docker container: ```bash docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash ``` - Leave these terminals open the testing steps will refer to these as the "first Kafka terminal" and "second Kafka terminal", all commands executed from these terminals will be within the kafka container. + Leave this terminal open the testing steps will refer to these as the "Kafka terminal", and commands executed from this terminal will be within the kafka container. -1. Open two new terminals and navigate to the root of the Morpheus repo. The first terminal will be referred to as the "Morpheus terminal" and will be used for running Morpheus pipelines and verifying output. The second terminal will be referred to as the "Triton terminal" used for launching Triton. +1. Open a new terminal and navigate to the root of the Morpheus repo, this will be referred to as the "Morpheus terminal" and will be used for running Morpheus pipelines and verifying output. ### File descriptors If you receive errors from Kafka such as `Too many open files`, you may need to increase the maximum number of open file descriptors. To check the current file descriptor limit run: @@ -49,12 +49,10 @@ To increase the limit (in this example to `4096`): ulimit -n 4096 ``` - - ## Simple Data Copying ### Checking KafkaSourceStage #### Single Partition Topic Test -1. From the first Kafka terminal, create a topic called "morpheus-src-copy-test" with only a single partition. +1. From the Kafka terminal, create a topic called "morpheus-src-copy-test" with only a single partition. ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test --partitions 1 --bootstrap-server `broker-list.sh` ``` @@ -70,7 +68,7 @@ ulimit -n 4096 to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv --overwrite ``` -1. Return to the first Kafka terminal and run: +1. Return to the Kafka terminal and run: ```bash cat /workspace/tests/tests_data/filter_probs.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -85,7 +83,7 @@ ulimit -n 4096 ``` #### Partitioned Topic Test -1. From the first Kafka terminal create a new topic named `morpheus-src-copy-test-p` with three partitions: +1. From the Kafka terminal create a new topic named `morpheus-src-copy-test-p` with three partitions: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` ``` @@ -101,7 +99,7 @@ ulimit -n 4096 to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test-p.csv --overwrite ``` -1. Return to the first Kafka terminal and run: +1. Return to the Kafka terminal and run: ```bash cat /workspace/tests/tests_data/filter_probs.jsonlines | \ $KAFKA_HOME/bin/kafka-console-producer.sh \ @@ -118,7 +116,7 @@ ulimit -n 4096 ### Checking WriteToKafkaStage #### Single Partition Topic Test -1. From the first Kafka terminal create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic: +1. From the Kafka terminal create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test --partitions 1 --bootstrap-server `broker-list.sh` @@ -148,10 +146,10 @@ ulimit -n 4096 ``` Note the usage of `jq --sort-keys` which will reformat the json output, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivalent. -1. Stop the consumer in the first Kafka terminal. +1. Stop the consumer in the Kafka terminal. #### Partitioned Topic Test -1. From the first Kafka terminal create a new topic named "morpheus-sink-copy-test-p" with three partitions, and start a consumer on that topic: +1. From the Kafka terminal create a new topic named "morpheus-sink-copy-test-p" with three partitions, and start a consumer on that topic: ```bash $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` @@ -181,282 +179,20 @@ ulimit -n 4096 ``` Note due to the multiple partitions the consumer most likely received records out of order, so we are comparing the sorted output of both files. -1. Stop the consumer in the first Kafka terminal. - - -## ABP Validation Pipeline -For this test we are going to replace the from & to file stages from the ABP validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-abp-pre" and writing results to a topic named "morpheus-abp-post" - -1. From the first Kafka terminal create two topics both with only a single partition, and launch a consumer listening to the `morpheus-abp-post` topic. - ```bash - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-abp-pre --partitions 1 --bootstrap-server `broker-list.sh` - - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-abp-post --partitions 1 --bootstrap-server `broker-list.sh` - - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-abp-post \ - --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines - ``` - -1. From the Triton terminal run: - ```bash - docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ - nvcr.io/nvidia/tritonserver:22.08-py3 \ - tritonserver --model-repository=/models/triton-model-repo \ - --exit-on-error=false \ - --model-control-mode=explicit \ - --load-model abp-nvsmi-xgb - ``` - -1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: - ```bash - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ - pipeline-fil \ - from-kafka --input_topic morpheus-abp-pre --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Read" \ - deserialize \ - preprocess \ - inf-triton --model_name=abp-nvsmi-xgb --server_url="localhost:8000" --force_convert_inputs=True \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - add-class \ - serialize \ - to-kafka --output_topic morpheus-abp-post --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Write" - ``` - -1. From the second Kafka terminal launch a producer to feed the `morpheus-abp-pre` topic with the input data: - ```bash - cat /workspace/models/datasets/validation-data/abp-validation-data.jsonlines | \ - $KAFKA_HOME/bin/kafka-console-producer.sh \ - --topic=morpheus-abp-pre --broker-list=`broker-list.sh` - - ``` - This command should execute quickly writing `1242` records and should complete in less than 5 seconds. - -1. Return to the Morpheus terminal. Once the `Kafka Write` monitor reports that `1242` messages have been written, shutdown Morpheus with Ctrl-C. We can check the number of lines in the output file: - ```bash - wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines - ``` - -1. Once all `1242` lines have been written to the output file, verify the contents with: - ```bash - diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/models/datasets/validation-data/abp-validation-data.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines | jq --sort-keys) - ``` - -1. Return to the first Kafka terminal and stop the consumer. -1. Return to the Triton Terminal and stop Triton. - -## DFP (Hammah) Validation Pipeline -### User123 -For this test we are going to replace the `to-file` stage from the Hammah validation pipeline with the `to-kafka` stage using a topic named "morpheus-hammah-user123". Note: this pipeline requires a custom `UserMessageMeta` class which the `from-kafka` stage is currently unable to generate, for that reason the `CloudTrailSourceStage` remains in-place. - -1. From the first Kafka terminal create the `morpheus-hammah-user123` topic, and launch a consumer listening to it: - ```bash - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-user123 --partitions 1 --bootstrap-server `broker-list.sh` - - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-hammah-user123 \ - --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-user123-pytorch.jsonlines - ``` - -1. From the Morpheus terminal launch the pipeline which will write results to kafka: - ```bash - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ - pipeline-ae --userid_filter="user123" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ - from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-*.csv" \ - train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/hammah-*.csv" --seed 42 \ - preprocess \ - inf-pytorch \ - add-scores \ - timeseries --resolution=1m --zscore_threshold=8.0 --hot_start \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - serialize --exclude='event_dt|tlsDetailsclientProvidedHostHeader' \ - to-kafka --output_topic morpheus-hammah-user123 --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Write" - ``` - - This pipeline should complete in approximately 10 seconds, with the Kafka monitor stage recording `847` messages written to Kafka. - -1. The Kafka consumer we started in step #1 won't give us any sort of indication as to how many records have been consumed, we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: - ```bash - wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines - ``` - -1. Once all `847` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. - -1. Verify the output with, expect to see `38` unmatched rows: - ```bash - ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ - ${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-user123-validation-data.csv \ - ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines \ - --index_col="_index_" --exclude "event_dt" --rel_tol=0.1 - ``` - -### Role-g -Similar to the Hammah User123 test, we are going to replace the `to-file` stage from the Hammah validation pipeline with the `to-kafka` stage using a topic named "morpheus-hammah-role-g". - -1. From the first Kafka terminal create the `morpheus-hammah-role-g` topic, and launch a consumer listening to it: - ```bash - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-role-g --partitions 1 --bootstrap-server `broker-list.sh` - - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-hammah-role-g \ - --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines - ``` - -1. From the Morpheus terminal launch the pipeline which will write results to kafka: - ```bash - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ - pipeline-ae --userid_filter="role-g" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ - from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-*.csv" \ - train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/hammah-*.csv" --seed 42 \ - preprocess \ - inf-pytorch \ - add-scores \ - timeseries --resolution=10m --zscore_threshold=8.0 \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - serialize --exclude='event_dt|tlsDetailsclientProvidedHostHeader' \ - to-kafka --output_topic morpheus-hammah-role-g --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Write" - ``` +1. Stop the consumer in the Kafka terminal. - This pipeline should complete in approximately 10 seconds, with the Kafka monitor stage recording `314` messages written to Kafka. - -1. The Kafka consumer we started in step #1 won't give us any sort of indication as to how many records have been consumed, we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: - ```bash - wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines - ``` - -1. Once all `314` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. - -1. Verify the output with, all rows should match: - ```bash - ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ - ${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-role-g-validation-data.csv \ - ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines \ - --index_col="_index_" --exclude "event_dt" --rel_tol=0.15 - ``` - -## Phishing Validation Pipeline -For this test we are going to replace the from & to file stages from the Phishing validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-phishing-pre" and writing results to a topic named "morpheus-phishing-post" - -1. From the first Kafka terminal create the two topics both with only a single partition, and launch a consumer listening to the `morpheus-phishing-post` topic. - ```bash - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-pre --partitions 1 --bootstrap-server `broker-list.sh` - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-post --partitions 1 --bootstrap-server `broker-list.sh` - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-phishing-post \ - --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_phishing.jsonlines - ``` - -1. From the Triton terminal launch Triton with: - ```bash - docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ - nvcr.io/nvidia/tritonserver:22.08-py3 \ - tritonserver --model-repository=/models/triton-model-repo \ - --exit-on-error=false \ - --model-control-mode=explicit \ - --load-model phishing-bert-onnx - ``` - -1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: - ```bash - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ - pipeline-nlp --model_seq_length=128 --labels_file=${MORPHEUS_ROOT}/morpheus/data/labels_phishing.txt \ - from-kafka --input_topic morpheus-phishing-pre --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Read" \ - deserialize \ - preprocess --vocab_hash_file=${MORPHEUS_ROOT}/morpheus/data/bert-base-uncased-hash.txt \ - --truncation=True --do_lower_case=True --add_special_tokens=False \ - inf-triton --model_name=phishing-bert-onnx --server_url="localhost:8000" --force_convert_inputs=True \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - add-class --label=pred --threshold=0.7 \ - serialize \ - to-kafka --output_topic morpheus-phishing-post --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Write" - ``` - -1. From the second Kafka terminal launch a Kafka producer to feed the `morpheus-phishing-pre` topic with the input data: - ```bash - cat /workspace/models/datasets/validation-data/phishing-email-validation-data.jsonlines | \ - $KAFKA_HOME/bin/kafka-console-producer.sh \ - --topic=morpheus-phishing-pre --broker-list=`broker-list.sh` - - ``` - This command should execute quickly writing `1010` records and should complete in less than 5 seconds. - -1. Return to the Morpheus terminal. The pipeline will take anywhere from 2 to 5 minutes to complete. Once the `Kafka Write` monitor has reported that `1010` messages have been written, shutdown Morpheus with Ctrl-C. We can check the number of lines in the output file: - ```bash - wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines - ``` - -1. Once all `1010` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. - -1. Verify the output with, expect to see `43` un-matched rows: - ```bash - ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ - ${MORPHEUS_ROOT}/models/datasets/validation-data/phishing-email-validation-data.jsonlines \ - ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines - ``` - -1. Stop Triton - -## Sid Validation Pipeline -For this test we are going to replace the file stage from the Sid validation pipeline with the to-kafka stage writing results to a topic named "morpheus-sid-post". -Note: Due to the complexity of the input data and a limitation of the cudf reader we will need to keep the from-file source reading data as CSV. - -1. From the Kafka terminal create a topic named `morpheus-sid-post` and launch a consumer listening to the topic. - ```bash - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sid-post --partitions 1 --bootstrap-server `broker-list.sh` - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sid-post \ - --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_sid.jsonlines - ``` - -1. From the Triton terminal launch Triton: - ```bash - docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ - nvcr.io/nvidia/tritonserver:22.08-py3 \ - tritonserver --model-repository=/models/triton-model-repo \ - --exit-on-error=false \ - --model-control-mode=explicit \ - --load-model sid-minibert-onnx - ``` - -1. From the Morpheus terminal launch the inference pipeline which will both listen and write to kafka: - ```bash - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ - pipeline-nlp --model_seq_length=256 \ - from-file --filename=${MORPHEUS_ROOT}/models/datasets/validation-data/sid-validation-data.csv \ - deserialize \ - preprocess --vocab_hash_file=${MORPHEUS_ROOT}/morpheus/data/bert-base-uncased-hash.txt \ - --truncation=True --do_lower_case=True --add_special_tokens=False \ - inf-triton --model_name=sid-minibert-onnx --server_url="localhost:8000" --force_convert_inputs=True \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - add-class --prefix="si_" \ - serialize --exclude "id" --exclude "^_ts_" \ - to-kafka --output_topic morpheus-sid-post --bootstrap_servers "${BROKER_LIST}" \ - monitor --description "Kafka Write" - ``` - -1. The pipeline will take approximately 2 minutes to complete. We can check the number of lines in the output file: - ```bash - wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines - ``` - -1. Once all `2000` rows have been written, return to the first Kafka terminal and stop the consumer with Ctrl-C. - -1. Verify the output with, expect to see `25` un-matched rows: - ```bash - ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ - ${MORPHEUS_ROOT}/models/datasets/validation-data/sid-validation-data.csv \ - ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines - ``` - -1. Stop Triton ## Optional Cleanup ### Delete all topics -1. Return to the first Kafka terminal and within the container run: +1. Return to the Kafka terminal and within the container run: ```bash $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh` | xargs -I'{}' $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server `broker-list.sh` --topic='{}' ``` + Note: The Kafka containers are using a persistent volume, and the topics will persist after a restart of the docker containers. + ### Shutdown Kafka -1. Exit from both Kafka terminals. +1. Exit the Kafka terminal. 1. From the root of the `kafka-docker` repo run (in the host OS not inside a container): ```bash docker-compose stop From d644e516dc18445456b528a861c40dbcbb630ae7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 9 Nov 2022 13:41:20 -0800 Subject: [PATCH 7/7] Add a bit about jq --- scripts/validation/kafka_testing.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md index a9592a583a..7a7e7dbae8 100644 --- a/scripts/validation/kafka_testing.md +++ b/scripts/validation/kafka_testing.md @@ -19,6 +19,10 @@ pytest --run_slow --run_kafka ```bash mkdir -p ${MORPHEUS_ROOT}/.tmp ``` +1. To help validate the data we will be using the `jq` command, if this is not already installed on your system it can be installed with: + ```bash + mamba install -c conda-forge jq + ``` 1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) following steps 1-6. 1. The testing steps below will require two separate terminal windows. Each will need to have the `KAFKA_ADVERTISED_HOST_NAME`, `BROKER_LIST` and `MORPHEUS_ROOT` environment variables set. In the example below both morpheus and kafka-docker repositories have been checked out into the `~work` directory, replacing these paths with the location of your checkouts.