diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml new file mode 100644 index 0000000000..eadfb9ed8d --- /dev/null +++ b/.github/workflows/ci_pipe.yml @@ -0,0 +1,154 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: CI Pipeline + +on: + workflow_call: + inputs: + aws_region: + default: 'us-west-2' + type: string + run_check: + required: true + type: boolean + container: + required: true + type: string + test_container: + required: true + type: string + secrets: + GHA_AWS_ACCESS_KEY_ID: + required: true + GHA_AWS_SECRET_ACCESS_KEY: + required: true + NGC_API_KEY: + required: true + +env: + AWS_DEFAULT_REGION: ${{ inputs.aws_region }} + AWS_ACCESS_KEY_ID: "${{ secrets.GHA_AWS_ACCESS_KEY_ID }}" + AWS_SECRET_ACCESS_KEY: "${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }}" + CHANGE_TARGET: "${{ github.base_ref }}" + CUDA_PATH: "/usr/local/cuda/" + CUDA_VER: "11.5" + GH_TOKEN: "${{ github.token }}" + GIT_COMMIT: "${{ github.sha }}" + MORPHEUS_ROOT: "${{ github.workspace }}/morpheus" + WORKSPACE: "${{ github.workspace }}/morpheus" + WORKSPACE_TMP: "${{ github.workspace }}/tmp" + + +jobs: + check: + if: ${{ inputs.run_check }} + name: Check + runs-on: [self-hosted, linux, amd64, cpu4] + timeout-minutes: 60 + container: + credentials: + username: '$oauthtoken' + password: ${{ secrets.NGC_API_KEY }} + image: ${{ inputs.container }} + strategy: + fail-fast: true + + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + lfs: false + path: 'morpheus' + fetch-depth: 0 + + - name: Check + shell: bash + run: ./morpheus/ci/scripts/github/checks.sh + + build: + name: Build + runs-on: [self-hosted, linux, amd64, cpu16] + timeout-minutes: 60 + container: + credentials: + username: '$oauthtoken' + password: ${{ secrets.NGC_API_KEY }} + image: ${{ inputs.container }} + strategy: + fail-fast: true + + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + lfs: false + path: 'morpheus' + + - name: Build:linux:x86_64:gcc + shell: bash + run: ./morpheus/ci/scripts/github/build.sh + + test: + name: Test + needs: [build] + runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] + timeout-minutes: 60 + container: + credentials: + username: '$oauthtoken' + password: ${{ secrets.NGC_API_KEY }} + image: ${{ inputs.test_container }} + options: --cap-add=sys_nice + env: + NVIDIA_VISIBLE_DEVICES: ${{ env.NVIDIA_VISIBLE_DEVICES }} + PARALLEL_LEVEL: '10' + strategy: + fail-fast: true + + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + lfs: false + path: 'morpheus' + + - name: Test:linux:x86_64:gcc + shell: bash + run: ./morpheus/ci/scripts/github/test.sh + + documentation: + name: Documentation + needs: [build] + runs-on: [self-hosted, linux, amd64, cpu4] + timeout-minutes: 60 + container: + credentials: + username: '$oauthtoken' + password: ${{ secrets.NGC_API_KEY }} + image: ${{ inputs.container }} + strategy: + fail-fast: true + + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + lfs: false + path: 'morpheus' + + - name: build_docs + shell: bash + run: ./morpheus/ci/scripts/github/docs.sh diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index d9c4d8a36d..25ae269801 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -26,105 +26,14 @@ concurrency: group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' cancel-in-progress: true -env: - AWS_DEFAULT_REGION: us-west-2 - AWS_ACCESS_KEY_ID: "${{ secrets.GHA_AWS_ACCESS_KEY_ID }}" - AWS_SECRET_ACCESS_KEY: "${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }}" - CHANGE_TARGET: "${{ github.base_ref }}" - CUDA_PATH: "/usr/local/cuda/" - CUDA_VER: "11.5" - GH_TOKEN: "${{ github.token }}" - GIT_COMMIT: "${{ github.sha }}" - MORPHEUS_ROOT: "${{ github.workspace }}/morpheus" - WORKSPACE: "${{ github.workspace }}/morpheus" - WORKSPACE_TMP: "${{ github.workspace }}/tmp" - - jobs: - check: - if: ${{ startsWith(github.ref_name, 'pull-request/') }} - name: Check - runs-on: [self-hosted, linux, amd64, cpu4] - timeout-minutes: 60 - container: - image: rapidsai/ci:cuda11.5.1-ubuntu20.04-py3.8 - strategy: - fail-fast: true - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - lfs: false - path: 'morpheus' - fetch-depth: 0 - - - name: Check - shell: bash - run: ./morpheus/ci/scripts/github/checks.sh - - build: - name: Build - runs-on: [self-hosted, linux, amd64, cpu16] - timeout-minutes: 60 - container: - image: rapidsai/ci:cuda11.5.1-ubuntu20.04-py3.8 - strategy: - fail-fast: true - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - lfs: false - path: 'morpheus' - - - name: Build:linux:x86_64:gcc - shell: bash - run: ./morpheus/ci/scripts/github/build.sh - - test: - name: Test - needs: [build] - runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] - timeout-minutes: 60 - container: - image: rapidsai/ci:cuda11.5.1-ubuntu20.04-py3.8 - options: --cap-add=sys_nice - env: - NVIDIA_VISIBLE_DEVICES: ${{ env.NVIDIA_VISIBLE_DEVICES }} - PARALLEL_LEVEL: '10' - strategy: - fail-fast: true - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - lfs: false - path: 'morpheus' - - - name: Test:linux:x86_64:gcc - shell: bash - run: ./morpheus/ci/scripts/github/test.sh - - documentation: - name: Documentation - needs: [build] - runs-on: [self-hosted, linux, amd64, cpu4] - timeout-minutes: 60 - container: - image: rapidsai/ci:cuda11.5.1-ubuntu20.04-py3.8 - strategy: - fail-fast: true - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - lfs: false - path: 'morpheus' - - - name: build_docs - shell: bash - run: ./morpheus/ci/scripts/github/docs.sh + ci_pipe: + uses: ./.github/workflows/ci_pipe.yml + with: + run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-driver-221102 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-221102 + secrets: + GHA_AWS_ACCESS_KEY_ID: ${{ secrets.GHA_AWS_ACCESS_KEY_ID }} + GHA_AWS_SECRET_ACCESS_KEY: ${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }} + NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/ci/runner/Dockerfile b/ci/runner/Dockerfile new file mode 100644 index 0000000000..1df5971b91 --- /dev/null +++ b/ci/runner/Dockerfile @@ -0,0 +1,94 @@ +# syntax=docker/dockerfile:1.3 + +# SPDX-FileCopyrightText: Copyright (c) 2022,NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Args used in FROM commands must come first +ARG FROM_IMAGE="rapidsai/ci" +ARG CUDA_PKG_VER=11-5 +ARG CUDA_SHORT_VER=11.5 +ARG CUDA_VER=11.5.1 +ARG LINUX_DISTRO=ubuntu +ARG LINUX_VER=20.04 +ARG PROJ_NAME=morpheus +ARG PYTHON_VER=3.8 + +# Configure the base docker img +FROM ${FROM_IMAGE}:cuda${CUDA_VER}-${LINUX_DISTRO}${LINUX_VER}-py${PYTHON_VER} AS base + +ARG PROJ_NAME +ARG CUDA_SHORT_VER + +SHELL ["/bin/bash", "-c"] + +# OS deps +RUN apt update && \ + DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ + apt install --no-install-recommends -y \ + libnuma1 && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* + +# Create conda environment +COPY ./docker/conda/environments/* /tmp/conda/ + +RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env create -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_dev.yml && \ + sed -i "s/conda activate base/conda activate ${PROJ_NAME}/g" ~/.bashrc && \ + conda clean -afy && \ + rm -rf /tmp/conda + + +# ============ driver ================== +FROM base as driver + +ARG CUDA_PKG_VER + +RUN apt update && \ + DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ + apt install --no-install-recommends -y \ + libcublas-dev-${CUDA_PKG_VER} \ + libcufft-dev-${CUDA_PKG_VER} \ + libcurand-dev-${CUDA_PKG_VER} \ + libcusolver-dev-${CUDA_PKG_VER} \ + libnvidia-compute-495 && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* + + +# ============ test ================== +FROM base as test + +ARG PROJ_NAME + +RUN apt update && \ + DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ + apt install --no-install-recommends -y \ + nodejs \ + npm \ + openjdk-11-jdk && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* + +# Install camouflage needed for unittests to mock a triton server +RUN npm install -g camouflage-server@0.9 && \ + npm cache clean --force + +# Install pytest-kafka +# Installing pytest-kafka from source instead of conda/pip as the setup.py includes helper methods for downloading Kafka +# https://gitlab.com/karolinepauls/pytest-kafka/-/issues/9 +RUN git clone https://gitlab.com/karolinepauls/pytest-kafka.git /opt/pytest-kafka && \ + cd /opt/pytest-kafka && \ + source activate ${PROJ_NAME} && \ + python setup.py develop diff --git a/ci/runner/README.md b/ci/runner/README.md new file mode 100644 index 0000000000..7257cb7376 --- /dev/null +++ b/ci/runner/README.md @@ -0,0 +1,43 @@ + + +The `Dockerfile` in this directory defines the images used by the CI runner not for Morpheus itself. + +# Building CI images +The `Dockerfile` defines two targets: `driver` and `test`. The `driver` target includes the Nvidia driver and several other libries needed to build Morpheus on a machine without access to a GPU. The `test` target includes extra packages required to run the Morpheus integration and unit tests. + +To build the images from the root of the Morpheus repo run: +```bash +SKIP_PUSH=1 ci/runner/build_and_push.sh +``` + +# Build and push CI images +This will require being a member of the `Morpheus Early Access CI` group in [NGC](https://catalog.ngc.nvidia.com) and logging into the `nvcr.io` registry prior to running. + +From the root of the Morpheus repo run: +```bash +ci/runner/build_and_push.sh +``` + +If the images are already built, the build step can be skipped by setting `SKIP_BUILD=1`. + +# Updating CI to use the new images +Update `.github/workflows/pull_request.yml` changing these two lines with the new image names: +```yaml + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-driver-221102 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-221102 +``` diff --git a/ci/runner/build_and_push.sh b/ci/runner/build_and_push.sh new file mode 100755 index 0000000000..5e128de34b --- /dev/null +++ b/ci/runner/build_and_push.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +DOCKER_TARGET=${DOCKER_TARGET:-"driver" "test"} +DOCKER_BUILDKIT=${DOCKER_BUILDKIT:-1} +DOCKER_REGISTRY_SERVER=${DOCKER_REGISTRY_SERVER:-"nvcr.io"} +DOCKER_REGISTRY_PATH=${DOCKER_REGISTRY_PATH:-"/ea-nvidia-morpheus/morpheus"} +DOCKER_TAG_PREFIX=${DOCKER_TAG_PREFIX:-"morpheus-ci"} +DOCKER_TAG_POSTFIX=${DOCKER_TAG_POSTFIX:-"$(date +'%y%m%d')"} +DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""} + +SKIP_BUILD=${SKIP_BUILD:-""} +SKIP_PUSH=${SKIP_PUSH:-""} + +set -e + +function get_image_full_name() { + echo "${DOCKER_REGISTRY_SERVER}${DOCKER_REGISTRY_PATH}:${DOCKER_TAG_PREFIX}-${build_target}-${DOCKER_TAG_POSTFIX}" +} + +if [[ "${SKIP_BUILD}" == "" ]]; then + for build_target in ${DOCKER_TARGET[@]}; do + FULL_NAME=$(get_image_full_name) + echo "Building target \"${build_target}\" as ${FULL_NAME}"; + docker build --network=host ${DOCKER_EXTRA_ARGS} --target ${build_target} -t ${FULL_NAME} -f ci/runner/Dockerfile . + done +fi + +if [[ "${SKIP_PUSH}" == "" ]]; then + for build_target in ${DOCKER_TARGET[@]}; do + FULL_NAME=$(get_image_full_name) + echo "Pushing ${FULL_NAME}"; + docker push ${FULL_NAME} + done +fi diff --git a/ci/scripts/github/build.sh b/ci/scripts/github/build.sh index de9c2e3170..eda5f87a9a 100755 --- a/ci/scripts/github/build.sh +++ b/ci/scripts/github/build.sh @@ -18,10 +18,7 @@ set -e source ${WORKSPACE}/ci/scripts/github/common.sh -install_deb_deps -install_build_deps - -create_conda_env +update_conda_env rapids-logger "Check versions" python3 --version @@ -30,8 +27,6 @@ g++ --version cmake --version ninja --version -rapids-logger "Env at build time:" -print_env_vars rapids-logger "Configuring cmake for Morpheus" cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} \ -DCCACHE_PROGRAM_PATH=$(which sccache) . @@ -41,18 +36,14 @@ cmake --build build --parallel ${PARALLEL_LEVEL} rapids-logger "sccache usage for morpheus build:" sccache --show-stats -sccache --zero-stats &> /dev/null rapids-logger "Installing Morpheus" cmake -DCOMPONENT=Wheel -P ${MORPHEUS_ROOT}/build/cmake_install.cmake -pip install ${MORPHEUS_ROOT}/build/wheel rapids-logger "Archiving results" -mamba pack --quiet --force --ignore-missing-files --n-threads ${PARALLEL_LEVEL} -n morpheus -o ${WORKSPACE_TMP}/conda_env.tar.gz tar cfj "${WORKSPACE_TMP}/wheel.tar.bz" build/wheel rapids-logger "Pushing results to ${DISPLAY_ARTIFACT_URL}" -aws s3 cp --no-progress "${WORKSPACE_TMP}/conda_env.tar.gz" "${ARTIFACT_URL}/conda_env.tar.gz" aws s3 cp --no-progress "${WORKSPACE_TMP}/wheel.tar.bz" "${ARTIFACT_URL}/wheel.tar.bz" rapids-logger "Success" diff --git a/ci/scripts/github/checks.sh b/ci/scripts/github/checks.sh index 2aadd59f5b..6151690ebb 100755 --- a/ci/scripts/github/checks.sh +++ b/ci/scripts/github/checks.sh @@ -18,13 +18,10 @@ set -e source ${WORKSPACE}/ci/scripts/github/common.sh -install_deb_deps -install_build_deps +update_conda_env fetch_base_branch -create_conda_env - rapids-logger "Checking copyright headers" python ${MORPHEUS_ROOT}/ci/scripts/copyright.py --verify-apache-v2 --git-diff-commits ${CHANGE_TARGET} ${GIT_COMMIT} diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index fa5c5a11df..89598e7cbc 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -51,7 +51,7 @@ export S3_URL="s3://rapids-downloads/ci/morpheus" export DISPLAY_URL="https://downloads.rapids.ai/ci/morpheus" export ARTIFACT_ENDPOINT="/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}" export ARTIFACT_URL="${S3_URL}${ARTIFACT_ENDPOINT}" -export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}/" +export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/" # Set sccache env vars export SCCACHE_S3_KEY_PREFIX=morpheus-${NVARCH} @@ -66,36 +66,14 @@ export FETCH_STATUS=0 print_env_vars -function install_deb_deps() { - apt -q -y update - apt -q -y install libnuma1 -} - -function install_build_deps() { - apt -q -y install libcublas-dev-11-5 \ - libcufft-dev-11-5 \ - libcurand-dev-11-5 \ - libcusolver-dev-11-5 \ - libnvidia-compute-495 -} - -function create_conda_env() { - rapids-logger "Creating conda env" - conda config --add pkgs_dirs /opt/conda/pkgs - conda config --env --add channels conda-forge - conda config --env --set channel_alias ${CONDA_CHANNEL_ALIAS:-"https://conda.anaconda.org"} - mamba env create -q -n morpheus -f ${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml - +function update_conda_env() { + rapids-logger "Checking for updates to conda env" + rapids-mamba-retry env update -n morpheus -q --file ${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml + conda deactivate conda activate morpheus - - rapids-logger "Installing CI dependencies" - mamba env update -q -f ${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_ci.yml - conda deactivate && conda activate morpheus - - rapids-logger "Final Conda Environment" - show_conda_info } + function fetch_base_branch() { rapids-logger "Retrieving base branch from GitHub API" [[ -n "$GH_TOKEN" ]] && CURL_HEADERS=('-H' "Authorization: token ${GH_TOKEN}") diff --git a/ci/scripts/github/docs.sh b/ci/scripts/github/docs.sh index 2083b684e0..043ef2885a 100755 --- a/ci/scripts/github/docs.sh +++ b/ci/scripts/github/docs.sh @@ -17,10 +17,13 @@ set -e source ${WORKSPACE}/ci/scripts/github/common.sh -install_deb_deps -install_build_deps -restore_conda_env +update_conda_env + +aws s3 cp --no-progress "${ARTIFACT_URL}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" + +tar xf "${WORKSPACE_TMP}/wheel.tar.bz" + pip install ${MORPHEUS_ROOT}/build/wheel rapids-logger "Pulling LFS assets" diff --git a/ci/scripts/github/test.sh b/ci/scripts/github/test.sh index 841cd3d533..5b801ce7bb 100755 --- a/ci/scripts/github/test.sh +++ b/ci/scripts/github/test.sh @@ -19,31 +19,17 @@ set -e source ${WORKSPACE}/ci/scripts/github/common.sh /usr/bin/nvidia-smi -install_deb_deps +update_conda_env -# Restore the environment and then ensure we have the CI dependencies -restore_conda_env +aws s3 cp --no-progress "${ARTIFACT_URL}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" + +tar xf "${WORKSPACE_TMP}/wheel.tar.bz" # Install the built Morpheus python package pip install ${MORPHEUS_ROOT}/build/wheel CPP_TESTS=($(find ${MORPHEUS_ROOT}/build/wheel -name "*.x")) -rapids-logger "Installing test dependencies" -npm install --silent -g camouflage-server@0.9 - -# Kafka tests need Java, since this stage is the only one that needs it, installing it here rather than adding it to -# the ci.yaml file -mamba install -c conda-forge "openjdk=11.0.15" -export PYTEST_KAFKA_DIR=${WORKSPACE_TMP}/pytest-kafka - -# Installing pytest-kafka from source instead of conda/pip as the setup.py includes helper methods for downloading Kafka -# https://gitlab.com/karolinepauls/pytest-kafka/-/issues/9 -git clone https://gitlab.com/karolinepauls/pytest-kafka.git ${PYTEST_KAFKA_DIR} -pushd ${PYTEST_KAFKA_DIR} -python setup.py develop -popd - rapids-logger "Pulling LFS assets" cd ${MORPHEUS_ROOT} diff --git a/docker/conda/environments/cuda11.5_ci.yml b/docker/conda/environments/cuda11.5_ci.yml deleted file mode 100644 index 0db501d83c..0000000000 --- a/docker/conda/environments/cuda11.5_ci.yml +++ /dev/null @@ -1,29 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Morpheus dependencies only needed during a CI build -name: morpheus -channels: - - conda-forge -dependencies: - - clangdev=14 # Needed for IWYU to actually compile the code - - conda-pack=0.7 - - git-lfs=3.2 - - include-what-you-use=0.18 - - pip - - pkg-config=0.29 - - sccache=0.3 - - pip: - - yapf==0.32 diff --git a/docker/conda/environments/cuda11.5_dev.yml b/docker/conda/environments/cuda11.5_dev.yml index 097b223da7..f1cd8899f4 100644 --- a/docker/conda/environments/cuda11.5_dev.yml +++ b/docker/conda/environments/cuda11.5_dev.yml @@ -47,6 +47,7 @@ dependencies: - gcc_linux-64=9.4 - gflags=2.2 - git>=2.35.3 # Needed for wildcards on safe.directory + - git-lfs=3.2 - glog=0.6 - gmock=1.10 - gputil diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index 1cd1212ea1..2bdbe2f1ea 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -21,7 +21,7 @@ limitations under the License. Prior to running the gnn fruad detection pipeline, additional requirements must be installed in to your conda environment. A supplemental requirements file has been provided in this example directory. ```bash -mamba install -c rapidsai -c nvidia -c stellargraph -c conda-forge chardet cuml stellargraph tensorflow +mamba env update -n ${CONDA_DEFAULT_ENV} -f examples/gnn_fraud_detection_pipeline/requirements.yml ``` ## Running diff --git a/examples/gnn_fraud_detection_pipeline/requirements.yml b/examples/gnn_fraud_detection_pipeline/requirements.yml index 44c44e950d..65ccde42f2 100644 --- a/examples/gnn_fraud_detection_pipeline/requirements.yml +++ b/examples/gnn_fraud_detection_pipeline/requirements.yml @@ -16,9 +16,12 @@ channels: - rapidsai - nvidia + - nvidia/label/dev # For pre-releases of SRF. Should still default to full releases if available + - nvidia/label/cuda-11.5.2 # For cuda-nvml-dev=11.5, which is not published under nvidia channel yet. - stellargraph - conda-forge dependencies: - - cuml - - stellargraph - - tensorflow + - chardet=5.0.0 + - cuml=22.08 + - stellargraph=1.2.1 + - tensorflow=2.9.1 diff --git a/morpheus/_lib/cmake/libraries/morpheus.cmake b/morpheus/_lib/cmake/libraries/morpheus.cmake index 61c0acd71b..e422791379 100644 --- a/morpheus/_lib/cmake/libraries/morpheus.cmake +++ b/morpheus/_lib/cmake/libraries/morpheus.cmake @@ -16,6 +16,7 @@ message(STATUS "Adding library: morpheus") add_library(morpheus # Keep these sorted! + ${MORPHEUS_LIB_ROOT}/src/io/deserializers.cpp ${MORPHEUS_LIB_ROOT}/src/io/serializers.cpp ${MORPHEUS_LIB_ROOT}/src/messages/memory/inference_memory.cpp ${MORPHEUS_LIB_ROOT}/src/messages/memory/inference_memory_fil.cpp @@ -30,6 +31,7 @@ add_library(morpheus ${MORPHEUS_LIB_ROOT}/src/messages/multi_inference_nlp.cpp ${MORPHEUS_LIB_ROOT}/src/messages/multi_response.cpp ${MORPHEUS_LIB_ROOT}/src/messages/multi_response_probs.cpp + ${MORPHEUS_LIB_ROOT}/src/messages/multi_tensor.cpp ${MORPHEUS_LIB_ROOT}/src/objects/fiber_queue.cpp ${MORPHEUS_LIB_ROOT}/src/objects/file_types.cpp ${MORPHEUS_LIB_ROOT}/src/objects/wrapped_tensor.cpp diff --git a/morpheus/_lib/include/morpheus/io/deserializers.hpp b/morpheus/_lib/include/morpheus/io/deserializers.hpp new file mode 100644 index 0000000000..e2d3231e24 --- /dev/null +++ b/morpheus/_lib/include/morpheus/io/deserializers.hpp @@ -0,0 +1,55 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace morpheus { +#pragma GCC visibility push(default) + +/** + * @brief Loads a cudf table from either CSV or JSON file + * + * @param filename + * @return cudf::io::table_with_metadata + */ +cudf::io::table_with_metadata load_table_from_file(const std::string& filename); + +/** + * @brief Loads a cudf table from a json soruce, replacing any escape characters in the source data that cudf can't + * handle + * + * @param json_options + * @return cudf::io::table_with_metadata + */ +cudf::io::table_with_metadata load_json_table(cudf::io::json_reader_options&& json_options); + +/** + * @brief Return the number of index columns in `data_table`, in practice this will be a `0` or `1`. + * If `data_table` contains a column named "Unnamed: 0" it will be renamed to "". + * + * @param data_table + * @return int + */ +int get_index_col_count(cudf::io::table_with_metadata& data_table); + +#pragma GCC visibility pop +} // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp b/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp index 966d225c90..ac560cb4b3 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp @@ -26,6 +26,8 @@ namespace morpheus { /** * TODO(Documentation) */ + +#pragma GCC visibility push(default) class InferenceMemory : public TensorMemory { public: @@ -43,7 +45,6 @@ class InferenceMemory : public TensorMemory }; /****** InferenceMemoryInterfaceProxy *************************/ -#pragma GCC visibility push(default) /** * @brief Interface proxy, used to insulate python bindings. */ diff --git a/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp b/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp index 62c716e952..2dbe4628a9 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp @@ -18,6 +18,7 @@ #pragma once #include "morpheus/messages/memory/response_memory.hpp" +#include "morpheus/messages/memory/tensor_memory.hpp" #include "morpheus/objects/tensor_object.hpp" #include @@ -36,14 +37,19 @@ class ResponseMemoryProbs : public ResponseMemory { public: ResponseMemoryProbs(size_t count, TensorObject probs); + ResponseMemoryProbs(size_t count, tensor_map_t &&tensors); /** - * TODO(Documentation) + * @brief Return the tensor named 'probs', throws a `std::runtime_error` if it does not exist. + * + * @return const TensorObject& */ const TensorObject &get_probs() const; /** - * TODO(Documentation) + * @brief Update the tensor named 'probs' + * + * @param probs */ void set_probs(TensorObject probs); }; diff --git a/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp b/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp index 7f6fe90abc..2cb283ab60 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp @@ -41,6 +41,7 @@ class TensorMemory TensorMemory(size_t count); TensorMemory(size_t count, tensor_map_t &&tensors); + virtual ~TensorMemory() = default; size_t count{0}; tensor_map_t tensors; diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference.hpp index 13c3b9ce37..a36209d5c0 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference.hpp @@ -20,6 +20,7 @@ #include "morpheus/messages/memory/inference_memory.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" +#include "morpheus/messages/multi_tensor.hpp" #include "morpheus/objects/tensor_object.hpp" #include @@ -38,7 +39,7 @@ namespace morpheus { * TODO(Documentation) */ #pragma GCC visibility push(default) -class MultiInferenceMessage : public DerivedMultiMessage +class MultiInferenceMessage : public DerivedMultiMessage { public: MultiInferenceMessage(const MultiInferenceMessage &other) = default; @@ -49,32 +50,26 @@ class MultiInferenceMessage : public DerivedMultiMessage memory; - std::size_t offset{0}; - std::size_t count{0}; - /** - * TODO(Documentation) + * @brief Return the input tensor for the given `name`. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_input(const std::string &name) const; /** - * TODO(Documentation) + * @brief Return the input tensor for the given `name`. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return TensorObject */ - const void set_input(const std::string &name, const TensorObject &value); + TensorObject get_input(const std::string &name); - protected: /** - * TODO(Documentation) + * Update the value of ain input tensor. The tensor must already exist, otherwise this will halt on a fatal error. */ - void get_slice_impl(std::shared_ptr new_message, std::size_t start, std::size_t stop) const override; - - void copy_ranges_impl(std::shared_ptr new_message, - const std::vector> &ranges, - size_t num_selected_rows) const override; - - std::shared_ptr copy_input_ranges(const std::vector> &ranges, - size_t num_selected_rows) const; + void set_input(const std::string &name, const TensorObject &value); }; /****** MultiInferenceMessageInterfaceProxy****************/ diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp index 73bb8b5068..711df6614c 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp @@ -45,22 +45,32 @@ class MultiInferenceFILMessage : public MultiInferenceMessage size_t count); /** - * TODO(Documentation) + * @brief Return the 'input__0' tensor, throws a `std::runtime_error` if it does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_input__0() const; /** - * TODO(Documentation) + * @brief Sets a tensor named 'input__0'. + * + * @param input__0 */ void set_input__0(const TensorObject& input__0); /** - * TODO(Documentation) + * @brief Return the 'seq_ids' tensor, throws a `std::runtime_error` if it does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_seq_ids() const; /** - * TODO(Documentation) + * @brief Sets a tensor named 'seq_ids'. + * + * @param seq_ids */ void set_seq_ids(const TensorObject& seq_ids); }; diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp index debbd6b384..cfa7ec3fef 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp @@ -46,32 +46,47 @@ class MultiInferenceNLPMessage : public MultiInferenceMessage std::size_t count); /** - * TODO(Documentation) + * @brief Return the 'input_ids' tensor, throws a `std::runtime_error` if it does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_input_ids() const; /** - * TODO(Documentation) + * @brief Sets a tensor named 'input_ids'. + * + * @param input_ids */ void set_input_ids(const TensorObject& input_ids); /** - * TODO(Documentation) + * @brief Return the 'input_mask' tensor, throws a `std::runtime_error` if it does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_input_mask() const; /** - * TODO(Documentation) + * @brief Sets a tensor named 'input_mask'. + * + * @param input_mask */ void set_input_mask(const TensorObject& input_mask); /** - * TODO(Documentation) + * @brief Return the 'seq_ids' tensor, throws a `std::runtime_error` if it does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_seq_ids() const; /** - * TODO(Documentation) + * @brief Sets a tensor named 'seq_ids'. + * + * @param seq_ids */ void set_seq_ids(const TensorObject& seq_ids); }; diff --git a/morpheus/_lib/include/morpheus/messages/multi_response.hpp b/morpheus/_lib/include/morpheus/messages/multi_response.hpp index 1730b09718..a2e2a5a42c 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_response.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_response.hpp @@ -20,6 +20,7 @@ #include "morpheus/messages/memory/response_memory.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" +#include "morpheus/messages/multi_tensor.hpp" #include "morpheus/objects/tensor_object.hpp" #include @@ -38,7 +39,7 @@ namespace morpheus { * TODO(Documentation) */ #pragma GCC visibility push(default) -class MultiResponseMessage : public DerivedMultiMessage +class MultiResponseMessage : public DerivedMultiMessage { public: MultiResponseMessage(const MultiResponseMessage &other) = default; @@ -49,37 +50,30 @@ class MultiResponseMessage : public DerivedMultiMessage memory; - std::size_t offset{0}; - std::size_t count{0}; - - /** - * TODO(Documentation) - */ - TensorObject get_output(const std::string &name); - /** - * TODO(Documentation) + * @brief Returns the output tensor with the given name. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return const TensorObject */ const TensorObject get_output(const std::string &name) const; /** - * TODO(Documentation) + * @brief Returns the output tensor with the given name. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return TensorObject */ - const void set_output(const std::string &name, const TensorObject &value); + TensorObject get_output(const std::string &name); - protected: /** - * TODO(Documentation) + * @brief Update the value of a given output tensor. The tensor must already exist, otherwise this will halt on a + * fatal error. + * + * @param name + * @param value */ - void get_slice_impl(std::shared_ptr new_message, std::size_t start, std::size_t stop) const override; - - void copy_ranges_impl(std::shared_ptr new_message, - const std::vector> &ranges, - size_t num_selected_rows) const override; - - std::shared_ptr copy_output_ranges(const std::vector> &ranges, - size_t num_selected_rows) const; + void set_output(const std::string &name, const TensorObject &value); }; /****** MultiResponseMessageInterfaceProxy *************************/ diff --git a/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp b/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp index 67e4164259..a454f89f06 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp @@ -17,7 +17,7 @@ #pragma once -#include "morpheus/messages/memory/response_memory.hpp" +#include "morpheus/messages/memory/response_memory_probs.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" #include "morpheus/messages/multi_response.hpp" @@ -43,17 +43,21 @@ class MultiResponseProbsMessage : public DerivedMultiMessage meta, size_t mess_offset, size_t mess_count, - std::shared_ptr memory, + std::shared_ptr memory, size_t offset, size_t count); /** - * TODO(Documentation) + * @brief Return the `probs` (probabilities) output tensor + * + * @return const TensorObject */ const TensorObject get_probs() const; /** - * TODO(Documentation) + * @brief Update the `probs` output tensor. Will halt on a fatal error if the `probs` output tensor does not exist. + * + * @param probs */ void set_probs(const TensorObject &probs); }; @@ -67,14 +71,14 @@ struct MultiResponseProbsMessageInterfaceProxy static std::shared_ptr init(std::shared_ptr meta, cudf::size_type mess_offset, cudf::size_type mess_count, - std::shared_ptr memory, + std::shared_ptr memory, cudf::size_type offset, cudf::size_type count); /** * TODO(Documentation) */ - static std::shared_ptr memory(MultiResponseProbsMessage &self); + static std::shared_ptr memory(MultiResponseProbsMessage &self); /** * TODO(Documentation) diff --git a/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp b/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp new file mode 100644 index 0000000000..03f6d733ca --- /dev/null +++ b/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp @@ -0,0 +1,101 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "morpheus/messages/memory/tensor_memory.hpp" +#include "morpheus/messages/meta.hpp" +#include "morpheus/messages/multi.hpp" +#include "morpheus/objects/tensor_object.hpp" + +#include +#include +#include +#include // for pair +#include + +namespace morpheus { +#pragma GCC visibility push(default) + +/****** MultiTensorMessage*******************************/ +/** + * Base class for MultiInferenceMessage & MultiResponseMessage + * Contains a pointer to an instance of TensorMemory along with an + * offset & count to those tensors. + * + * mess_offset & mess_count refer to the range of records in meta. + * offset & count refer to the range of records in TensorMemory. + * + * While TensorMemory can contain multiple tensors, it is a requirement that + * they are all of the same length and that element N in each tensor refers + * to the same record. + */ +class MultiTensorMessage : public DerivedMultiMessage +{ + public: + MultiTensorMessage(const MultiTensorMessage &other) = default; + MultiTensorMessage(std::shared_ptr meta, + std::size_t mess_offset, + std::size_t mess_count, + std::shared_ptr memory, + std::size_t offset, + std::size_t count); + + std::shared_ptr memory; + std::size_t offset{0}; + std::size_t count{0}; + + /** + * @brief Returns a tensor with the given name. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return const TensorObject + */ + const TensorObject get_tensor(const std::string &name) const; + + /** + * @brief Returns a tensor with the given name. Will halt on a fatal error if the tensor does not exist. + * + * @param name + * @return TensorObject + */ + TensorObject get_tensor(const std::string &name); + + /** + * @brief Update the value of a given tensor. The tensor must already exist, otherwise this will halt on a fatal + * error. + * + * @param name + * @param value + */ + void set_tensor(const std::string &name, const TensorObject &value); + + protected: + void get_slice_impl(std::shared_ptr new_message, std::size_t start, std::size_t stop) const override; + + void copy_ranges_impl(std::shared_ptr new_message, + const std::vector> &ranges, + size_t num_selected_rows) const override; + + std::shared_ptr copy_input_ranges( + const std::vector> &ranges, std::size_t num_selected_rows) const; + + TensorObject get_tensor_impl(const std::string &name) const; +}; + +#pragma GCC visibility pop +} // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/objects/tensor.hpp b/morpheus/_lib/include/morpheus/objects/tensor.hpp index e66752c7b7..a84fecff5c 100644 --- a/morpheus/_lib/include/morpheus/objects/tensor.hpp +++ b/morpheus/_lib/include/morpheus/objects/tensor.hpp @@ -34,6 +34,8 @@ namespace morpheus { /** * TODO(Documentation) */ + +#pragma GCC visibility push(default) class Tensor { public: @@ -80,4 +82,6 @@ class Tensor size_t m_offset; std::shared_ptr m_device_buffer; }; + +#pragma GCC visibility pop } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index 7dc4c54f15..a3e8ff72c1 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -49,10 +49,6 @@ class FileSourceStage : public srf::pysrf::PythonSource inout_mapping = {}); + static void reset_request_id(size_t value); + private: /** * TODO(Documentation) @@ -110,6 +112,8 @@ struct InferenceClientStageInterfaceProxy bool use_shared_memory, bool needs_logits, std::map inout_mapping); + + static void reset_request_id(size_t value); }; #pragma GCC visibility pop } // namespace morpheus diff --git a/morpheus/_lib/src/io/deserializers.cpp b/morpheus/_lib/src/io/deserializers.cpp new file mode 100644 index 0000000000..c64052af74 --- /dev/null +++ b/morpheus/_lib/src/io/deserializers.cpp @@ -0,0 +1,114 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/io/deserializers.hpp" + +#include +#include +#include // for string_scalar +#include +#include // IWYU pragma: keep +#include // for cudf::type_id +#include + +#include +#include // needed for logging +#include + +namespace morpheus { + +cudf::io::table_with_metadata load_json_table(cudf::io::json_reader_options&& json_options) +{ + auto tbl = cudf::io::read_json(json_options); + + auto found = std::find(tbl.metadata.column_names.begin(), tbl.metadata.column_names.end(), "data"); + + if (found == tbl.metadata.column_names.end()) + return tbl; + + // Super ugly but cudf cant handle newlines and add extra escapes. So we need to convert + // \\n -> \n + // \\/ -> \/ + auto columns = tbl.tbl->release(); + + size_t idx = found - tbl.metadata.column_names.begin(); + + auto updated_data = cudf::strings::replace( + cudf::strings_column_view{columns[idx]->view()}, cudf::string_scalar("\\n"), cudf::string_scalar("\n")); + + updated_data = cudf::strings::replace( + cudf::strings_column_view{updated_data->view()}, cudf::string_scalar("\\/"), cudf::string_scalar("/")); + + columns[idx] = std::move(updated_data); + + tbl.tbl = std::move(std::make_unique(std::move(columns))); + + return tbl; +} + +cudf::io::table_with_metadata load_table_from_file(const std::string& filename) +{ + auto file_path = std::filesystem::path(filename); + + if (file_path.extension() == ".json" || file_path.extension() == ".jsonlines") + { + // First, load the file into json + auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(true); + return load_json_table(options.build()); + } + else if (file_path.extension() == ".csv") + { + auto options = cudf::io::csv_reader_options::builder(cudf::io::source_info{filename}); + return cudf::io::read_csv(options.build()); + } + else + { + LOG(FATAL) << "Unknown extension for file: " << filename; + throw std::runtime_error("Unknown extension"); + } +} + +int get_index_col_count(cudf::io::table_with_metadata& data_table) +{ + int index_col_count = 0; + + // Check if we have a first column with INT64 data type + if (data_table.metadata.column_names.size() >= 1 && + data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64) + { + std::regex index_regex(R"((unnamed: 0|id))", std::regex_constants::ECMAScript | std::regex_constants::icase); + + // Get the column name + auto col_name = data_table.metadata.column_names[0]; + + // Check it against some common terms + if (std::regex_search(col_name, index_regex)) + { + // Also, if its the hideous 'Unnamed: 0', then just use an empty string + if (col_name == "Unnamed: 0") + { + data_table.metadata.column_names[0] = ""; + } + + index_col_count = 1; + } + } + + return index_col_count; +} + +} // namespace morpheus diff --git a/morpheus/_lib/src/messages/memory/response_memory_probs.cpp b/morpheus/_lib/src/messages/memory/response_memory_probs.cpp index 575182788e..fe24d41cc0 100644 --- a/morpheus/_lib/src/messages/memory/response_memory_probs.cpp +++ b/morpheus/_lib/src/messages/memory/response_memory_probs.cpp @@ -17,8 +17,6 @@ #include "morpheus/messages/memory/response_memory_probs.hpp" -#include "morpheus/messages/memory/response_memory.hpp" -#include "morpheus/messages/memory/tensor_memory.hpp" #include "morpheus/utilities/cupy_util.hpp" #include @@ -38,6 +36,12 @@ ResponseMemoryProbs::ResponseMemoryProbs(size_t count, TensorObject probs) : Res this->tensors["probs"] = std::move(probs); } +ResponseMemoryProbs::ResponseMemoryProbs(size_t count, tensor_map_t &&tensors) : + ResponseMemory(count, std::move(tensors)) +{ + CHECK(has_tensor("probs")) << "Tensor: 'probs' not found in memory"; +} + const TensorObject &ResponseMemoryProbs::get_probs() const { auto found = this->tensors.find("probs"); diff --git a/morpheus/_lib/src/messages/multi_inference.cpp b/morpheus/_lib/src/messages/multi_inference.cpp index c5fb6379e8..48944f4bb4 100644 --- a/morpheus/_lib/src/messages/multi_inference.cpp +++ b/morpheus/_lib/src/messages/multi_inference.cpp @@ -42,78 +42,22 @@ MultiInferenceMessage::MultiInferenceMessage(std::shared_ptr memory, std::size_t offset, std::size_t count) : - DerivedMultiMessage(meta, mess_offset, mess_count), - memory(std::move(memory)), - offset(offset), - count(count) + DerivedMultiMessage(meta, mess_offset, mess_count, memory, offset, count) {} const TensorObject MultiInferenceMessage::get_input(const std::string &name) const { - CHECK(this->memory->has_input(name)) << "Cound not find input: " << name; - - // check if we are getting the entire input - if (this->offset == 0 && this->count == this->memory->count) - { - return this->memory->tensors[name]; - } - - // TODO(MDD): This really needs to return the slice of the tensor - return this->memory->tensors[name].slice({static_cast(this->offset), 0}, - {static_cast(this->offset + this->count), -1}); -} - -const void MultiInferenceMessage::set_input(const std::string &name, const TensorObject &value) -{ - // Get the input slice first - auto slice = this->get_input(name); - - // Set the value to use assignment - slice = value; -} - -void MultiInferenceMessage::get_slice_impl(std::shared_ptr new_message, - std::size_t start, - std::size_t stop) const -{ - auto sliced_message = DCHECK_NOTNULL(std::dynamic_pointer_cast(new_message)); - - sliced_message->offset = start; - sliced_message->count = stop - start; - - // If we have more inference rows than message rows, we need to use the seq_ids to figure out the slicing. This - // will be slow and should be avoided at all costs - if (this->count != this->mess_count && this->memory->has_input("seq_ids")) - { - auto seq_ids = this->get_input("seq_ids"); - - // Determine the new start and stop before passing onto the base - start = seq_ids.read_element({(TensorIndex)start, 0}); - stop = seq_ids.read_element({(TensorIndex)stop - 1, 0}) + 1; - } - - // Pass onto the base - DerivedMultiMessage::get_slice_impl(new_message, start, stop); + return get_tensor(name); } -void MultiInferenceMessage::copy_ranges_impl(std::shared_ptr new_message, - const std::vector> &ranges, - size_t num_selected_rows) const +TensorObject MultiInferenceMessage::get_input(const std::string &name) { - auto copied_message = DCHECK_NOTNULL(std::dynamic_pointer_cast(new_message)); - DerivedMultiMessage::copy_ranges_impl(copied_message, ranges, num_selected_rows); - - copied_message->offset = 0; - copied_message->count = num_selected_rows; - copied_message->memory = copy_input_ranges(ranges, num_selected_rows); + return get_tensor(name); } -std::shared_ptr MultiInferenceMessage::copy_input_ranges( - const std::vector> &ranges, size_t num_selected_rows) const +void MultiInferenceMessage::set_input(const std::string &name, const TensorObject &value) { - auto offset_ranges = apply_offset_to_ranges(offset, ranges); - auto tensors = memory->copy_tensor_ranges(offset_ranges, num_selected_rows); - return std::make_shared(num_selected_rows, std::move(tensors)); + set_tensor(name, value); } /****** InterfaceProxy *************************/ @@ -131,7 +75,8 @@ std::shared_ptr MultiInferenceMessageInterfaceProxy::init std::shared_ptr MultiInferenceMessageInterfaceProxy::memory(MultiInferenceMessage &self) { - return self.memory; + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + return std::static_pointer_cast(self.memory); } std::size_t MultiInferenceMessageInterfaceProxy::offset(MultiInferenceMessage &self) diff --git a/morpheus/_lib/src/messages/multi_inference_fil.cpp b/morpheus/_lib/src/messages/multi_inference_fil.cpp index e22a1650a3..39deecf52f 100644 --- a/morpheus/_lib/src/messages/multi_inference_fil.cpp +++ b/morpheus/_lib/src/messages/multi_inference_fil.cpp @@ -73,7 +73,8 @@ std::shared_ptr MultiInferenceFILMessageInterfaceProxy std::shared_ptr MultiInferenceFILMessageInterfaceProxy::memory( MultiInferenceFILMessage &self) { - return self.memory; + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + return std::static_pointer_cast(self.memory); } std::size_t MultiInferenceFILMessageInterfaceProxy::offset(MultiInferenceFILMessage &self) diff --git a/morpheus/_lib/src/messages/multi_inference_nlp.cpp b/morpheus/_lib/src/messages/multi_inference_nlp.cpp index 16945b6702..fa60a60bae 100644 --- a/morpheus/_lib/src/messages/multi_inference_nlp.cpp +++ b/morpheus/_lib/src/messages/multi_inference_nlp.cpp @@ -86,7 +86,8 @@ std::shared_ptr MultiInferenceNLPMessageInterfaceProxy std::shared_ptr MultiInferenceNLPMessageInterfaceProxy::memory( MultiInferenceNLPMessage &self) { - return self.memory; + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + return std::static_pointer_cast(self.memory); } std::size_t MultiInferenceNLPMessageInterfaceProxy::offset(MultiInferenceNLPMessage &self) diff --git a/morpheus/_lib/src/messages/multi_response.cpp b/morpheus/_lib/src/messages/multi_response.cpp index 7ffbf59469..90912ecfd6 100644 --- a/morpheus/_lib/src/messages/multi_response.cpp +++ b/morpheus/_lib/src/messages/multi_response.cpp @@ -36,95 +36,28 @@ namespace morpheus { /****** Component public implementations *******************/ -/****** MultiResponseMessage****************************************/ -MultiResponseMessage::MultiResponseMessage(std::shared_ptr meta, +MultiResponseMessage::MultiResponseMessage(std::shared_ptr meta, std::size_t mess_offset, std::size_t mess_count, std::shared_ptr memory, std::size_t offset, std::size_t count) : - DerivedMultiMessage(meta, mess_offset, mess_count), - memory(std::move(memory)), - offset(offset), - count(count) + DerivedMultiMessage(meta, mess_offset, mess_count, memory, offset, count) {} -TensorObject MultiResponseMessage::get_output(const std::string &name) -{ - CHECK(this->memory->has_output(name)) << "Could not find output: " << name; - - // check if we are getting the entire input - if (this->offset == 0 && this->count == this->memory->count) - { - return this->memory->tensors[name]; - } - - // TODO(MDD): This really needs to return the slice of the tensor - return this->memory->tensors[name].slice({static_cast(this->offset), 0}, - {static_cast(this->offset + this->count), -1}); -} - const TensorObject MultiResponseMessage::get_output(const std::string &name) const { - CHECK(this->memory->has_output(name)) << "Could not find output: " << name; - - // check if we are getting the entire input - if (this->offset == 0 && this->count == this->memory->count) - { - return this->memory->tensors[name]; - } - - // TODO(MDD): This really needs to return the slice of the tensor - return this->memory->tensors[name].slice({static_cast(this->offset), 0}, - {static_cast(this->offset + this->count), -1}); + return get_tensor(name); } -const void MultiResponseMessage::set_output(const std::string &name, const TensorObject &value) -{ - // Get the input slice first - auto slice = this->get_output(name); - - // Set the value to use assignment - slice = value; -} - -void MultiResponseMessage::get_slice_impl(std::shared_ptr new_message, - std::size_t start, - std::size_t stop) const -{ - auto sliced_message = DCHECK_NOTNULL(std::dynamic_pointer_cast(new_message)); - - sliced_message->offset = start; - sliced_message->count = stop - start; - - // Currently our output lengths should always match mess_count, and even if they didn't we wouldn't have any way - // to associate rows in the output with rows in the dataframe. Note on the input side we have the seq_ids array - // to but we don't have any equivelant for the output. - DCHECK(this->count == this->mess_count) - << "Number of rows in response output does not match number of messages in DF"; - - // Pass onto the base - DerivedMultiMessage::get_slice_impl(new_message, start, stop); -} - -void MultiResponseMessage::copy_ranges_impl(std::shared_ptr new_message, - const std::vector> &ranges, - size_t num_selected_rows) const +TensorObject MultiResponseMessage::get_output(const std::string &name) { - auto copied_message = DCHECK_NOTNULL(std::dynamic_pointer_cast(new_message)); - DerivedMultiMessage::copy_ranges_impl(copied_message, ranges, num_selected_rows); - - copied_message->offset = 0; - copied_message->count = num_selected_rows; - copied_message->memory = copy_output_ranges(ranges, num_selected_rows); + return get_tensor(name); } -std::shared_ptr MultiResponseMessage::copy_output_ranges( - const std::vector> &ranges, size_t num_selected_rows) const +void MultiResponseMessage::set_output(const std::string &name, const TensorObject &value) { - auto offset_ranges = apply_offset_to_ranges(offset, ranges); - auto tensors = memory->copy_tensor_ranges(offset_ranges, num_selected_rows); - return std::make_shared(num_selected_rows, std::move(tensors)); + set_tensor(name, value); } /****** MultiResponseMessageInterfaceProxy *************************/ @@ -141,7 +74,9 @@ std::shared_ptr MultiResponseMessageInterfaceProxy::init(s std::shared_ptr MultiResponseMessageInterfaceProxy::memory(MultiResponseMessage &self) { - return self.memory; + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + + return std::static_pointer_cast(self.memory); } std::size_t MultiResponseMessageInterfaceProxy::offset(MultiResponseMessage &self) diff --git a/morpheus/_lib/src/messages/multi_response_probs.cpp b/morpheus/_lib/src/messages/multi_response_probs.cpp index cdc1171f02..fb854dce2f 100644 --- a/morpheus/_lib/src/messages/multi_response_probs.cpp +++ b/morpheus/_lib/src/messages/multi_response_probs.cpp @@ -32,7 +32,7 @@ namespace morpheus { MultiResponseProbsMessage::MultiResponseProbsMessage(std::shared_ptr meta, size_t mess_offset, size_t mess_count, - std::shared_ptr memory, + std::shared_ptr memory, size_t offset, size_t count) : DerivedMultiMessage(meta, mess_offset, mess_count, memory, offset, count) @@ -56,7 +56,7 @@ std::shared_ptr MultiResponseProbsMessageInterfacePro std::shared_ptr meta, cudf::size_type mess_offset, cudf::size_type mess_count, - std::shared_ptr memory, + std::shared_ptr memory, cudf::size_type offset, cudf::size_type count) { @@ -64,10 +64,12 @@ std::shared_ptr MultiResponseProbsMessageInterfacePro std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiResponseProbsMessageInterfaceProxy::memory( +std::shared_ptr MultiResponseProbsMessageInterfaceProxy::memory( MultiResponseProbsMessage &self) { - return self.memory; + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + + return std::static_pointer_cast(self.memory); } std::size_t MultiResponseProbsMessageInterfaceProxy::offset(MultiResponseProbsMessage &self) diff --git a/morpheus/_lib/src/messages/multi_tensor.cpp b/morpheus/_lib/src/messages/multi_tensor.cpp new file mode 100644 index 0000000000..ffdf93be03 --- /dev/null +++ b/morpheus/_lib/src/messages/multi_tensor.cpp @@ -0,0 +1,122 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/messages/multi_tensor.hpp" + +#include "morpheus/utilities/cupy_util.hpp" + +#include // for cudf::size_type> +#include + +#include // for int32_t +#include // needed for logging + +namespace morpheus { +/****** Component public implementations *******************/ +/****** ****************************************/ +MultiTensorMessage::MultiTensorMessage(std::shared_ptr meta, + std::size_t mess_offset, + std::size_t mess_count, + std::shared_ptr memory, + std::size_t offset, + std::size_t count) : + DerivedMultiMessage(meta, mess_offset, mess_count), + memory(std::move(memory)), + offset(offset), + count(count) +{} + +const TensorObject MultiTensorMessage::get_tensor(const std::string &name) const +{ + return get_tensor_impl(name); +} + +TensorObject MultiTensorMessage::get_tensor(const std::string &name) +{ + return get_tensor_impl(name); +} + +TensorObject MultiTensorMessage::get_tensor_impl(const std::string &name) const +{ + CHECK(this->memory->has_tensor(name)) << "Cound not find tensor: " << name; + + // check if we are getting the entire input + if (this->offset == 0 && this->count == this->memory->count) + { + return this->memory->tensors[name]; + } + + return this->memory->tensors[name].slice({static_cast(this->offset), 0}, + {static_cast(this->offset + this->count), -1}); +} + +void MultiTensorMessage::set_tensor(const std::string &name, const TensorObject &value) +{ + // Get the input slice first + auto slice = this->get_tensor(name); + + // Set the value to use assignment + slice = value; +} + +void MultiTensorMessage::get_slice_impl(std::shared_ptr new_message, + std::size_t start, + std::size_t stop) const +{ + DCHECK(std::dynamic_pointer_cast(new_message) != nullptr); + auto sliced_message = std::static_pointer_cast(new_message); + + sliced_message->offset = start; + sliced_message->count = stop - start; + + // If we have more inference rows than message rows, we need to use the seq_ids to figure out the slicing. This + // will be slow and should be avoided at all costs + if (this->count != this->mess_count && this->memory->has_tensor("seq_ids")) + { + auto seq_ids = this->get_tensor("seq_ids"); + + // Determine the new start and stop before passing onto the base + start = seq_ids.read_element({(TensorIndex)start, 0}); + stop = seq_ids.read_element({(TensorIndex)stop - 1, 0}) + 1; + } + + // Pass onto the base + DerivedMultiMessage::get_slice_impl(new_message, start, stop); +} + +void MultiTensorMessage::copy_ranges_impl(std::shared_ptr new_message, + const std::vector> &ranges, + size_t num_selected_rows) const +{ + DCHECK(std::dynamic_pointer_cast(new_message) != nullptr); + auto copied_message = std::static_pointer_cast(new_message); + DerivedMultiMessage::copy_ranges_impl(copied_message, ranges, num_selected_rows); + + copied_message->offset = 0; + copied_message->count = num_selected_rows; + copied_message->memory = copy_input_ranges(ranges, num_selected_rows); +} + +std::shared_ptr MultiTensorMessage::copy_input_ranges( + const std::vector> &ranges, size_t num_selected_rows) const +{ + auto offset_ranges = apply_offset_to_ranges(offset, ranges); + auto tensors = memory->copy_tensor_ranges(offset_ranges, num_selected_rows); + return std::make_shared(num_selected_rows, std::move(tensors)); +} + +} // namespace morpheus diff --git a/morpheus/_lib/src/objects/tensor.cpp b/morpheus/_lib/src/objects/tensor.cpp index 0d0f56d651..e9eb146edb 100644 --- a/morpheus/_lib/src/objects/tensor.cpp +++ b/morpheus/_lib/src/objects/tensor.cpp @@ -19,6 +19,7 @@ #include "morpheus/objects/rmm_tensor.hpp" #include "morpheus/objects/tensor_object.hpp" +#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride #include "morpheus/utilities/type_util.hpp" #include // for cudaMemcpy, cudaMemcpyDeviceToHost @@ -79,6 +80,10 @@ TensorObject Tensor::create(std::shared_ptr buffer, { auto md = nullptr; + if (!strides.empty()) + { + strides = TensorUtils::get_element_stride(strides); + } auto tensor = std::make_shared(buffer, offset, dtype, shape, strides); return TensorObject(md, tensor); diff --git a/morpheus/_lib/src/python_modules/stages.cpp b/morpheus/_lib/src/python_modules/stages.cpp index c12b2e5402..b34795fe28 100644 --- a/morpheus/_lib/src/python_modules/stages.cpp +++ b/morpheus/_lib/src/python_modules/stages.cpp @@ -117,7 +117,8 @@ PYBIND11_MODULE(stages, m) py::arg("force_convert_inputs"), py::arg("use_shared_memory"), py::arg("needs_logits"), - py::arg("inout_mapping") = py::dict()); + py::arg("inout_mapping") = py::dict()) + .def_static("reset_request_id", &InferenceClientStageInterfaceProxy::reset_request_id, py::arg("value") = 0); py::class_, srf::segment::ObjectProperties, diff --git a/morpheus/_lib/src/stages/add_classification.cpp b/morpheus/_lib/src/stages/add_classification.cpp index 97b1f0f871..270d16ddbd 100644 --- a/morpheus/_lib/src/stages/add_classification.cpp +++ b/morpheus/_lib/src/stages/add_classification.cpp @@ -21,6 +21,7 @@ #include "morpheus/objects/tensor.hpp" #include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject #include "morpheus/utilities/matx_util.hpp" +#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride #include "morpheus/utilities/type_util.hpp" // for DType #include "morpheus/utilities/type_util_detail.hpp" // for DataType @@ -77,16 +78,8 @@ AddClassificationsStage::subscribe_fn_t AddClassificationsStage::build_operator( SRF_CHECK_CUDA( cudaMemcpy(tmp_buffer->data(), probs.data(), tmp_buffer->size(), cudaMemcpyDeviceToDevice)); - // Depending on the input the stride is given in bytes or elements, - // divide the stride elements by the smallest item to ensure tensor_stride is defined in - // terms of elements - std::vector tensor_stride(stride.size()); - auto min_stride = std::min_element(stride.cbegin(), stride.cend()); - - std::transform(stride.cbegin(), - stride.cend(), - tensor_stride.begin(), - std::bind(std::divides<>(), std::placeholders::_1, *min_stride)); + // Depending on the input the stride is given in bytes or elements, convert to elements + auto tensor_stride = TensorUtils::get_element_stride(stride); // Now call the threshold function auto thresh_bool_buffer = diff --git a/morpheus/_lib/src/stages/add_scores.cpp b/morpheus/_lib/src/stages/add_scores.cpp index b3e81c79af..46dfa12b7c 100644 --- a/morpheus/_lib/src/stages/add_scores.cpp +++ b/morpheus/_lib/src/stages/add_scores.cpp @@ -20,20 +20,13 @@ #include "morpheus/objects/tensor.hpp" #include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject -#include // for cudaMemcpy, cudaMemcpyDeviceToDevice #include -#include // for cuda_stream_per_thread -#include // for device_buffer -#include // for Status -#include // for SRF_CHECK_CUDA #include // for SinkProperties<>::sink_type_t #include // for SourceProperties<>::source_type_t #include // for Object -#include // for min_element, transform -#include // for size_t +#include // for size_t #include -#include // for divides, placeholders #include #include #include // for logging @@ -59,9 +52,8 @@ AddScoresStage::subscribe_fn_t AddScoresStage::build_operator() return [this](rxcpp::observable input, rxcpp::subscriber output) { return input.subscribe(rxcpp::make_observer( [this, &output](sink_type_t x) { - const auto& probs = x->get_probs(); - const auto& shape = probs.get_shape(); - const auto& stride = probs.get_stride(); + const auto& probs = x->get_probs(); + const auto& shape = probs.get_shape(); CHECK(shape.size() == 2 && shape[1] == m_num_class_labels) << "Label count does not match output of model. Label count: " << m_num_class_labels @@ -70,28 +62,6 @@ AddScoresStage::subscribe_fn_t AddScoresStage::build_operator() const std::size_t num_rows = shape[0]; const std::size_t num_columns = shape[1]; - auto tmp_buffer = std::make_shared(probs.bytes(), rmm::cuda_stream_per_thread); - - SRF_CHECK_CUDA( - cudaMemcpy(tmp_buffer->data(), probs.data(), tmp_buffer->size(), cudaMemcpyDeviceToDevice)); - - // Depending on the input the stride is given in bytes or elements, - // divide the stride elements by the smallest item to ensure tensor_stride is defined in - // terms of elements - std::vector tensor_stride(stride.size()); - auto min_stride = std::min_element(stride.cbegin(), stride.cend()); - - std::transform(stride.cbegin(), - stride.cend(), - tensor_stride.begin(), - std::bind(std::divides<>(), std::placeholders::_1, *min_stride)); - - auto tensor_obj = Tensor::create( - tmp_buffer, - probs.dtype(), - std::vector{static_cast(shape[0]), static_cast(shape[1])}, - tensor_stride); - std::vector columns(m_idx2label.size()); std::vector tensors(m_idx2label.size()); @@ -99,9 +69,9 @@ AddScoresStage::subscribe_fn_t AddScoresStage::build_operator() for (const auto& [column_num, column_name] : m_idx2label) { columns[i] = column_name; - tensors[i] = tensor_obj.slice(std::vector{0, static_cast(column_num)}, - std::vector{static_cast(num_rows), - static_cast(column_num + 1)}); + tensors[i] = probs.slice(std::vector{0, static_cast(column_num)}, + std::vector{static_cast(num_rows), + static_cast(column_num + 1)}); ++i; } @@ -122,8 +92,6 @@ std::shared_ptr> AddScoresStageInterfacePro std::size_t num_class_labels, std::map idx2label) { - auto stage = builder.construct_object(name, num_class_labels, std::move(idx2label)); - - return stage; + return builder.construct_object(name, num_class_labels, std::move(idx2label)); } } // namespace morpheus diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index 9db0dd895f..46eda28522 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -17,6 +17,8 @@ #include "morpheus/stages/file_source.hpp" +#include "morpheus/io/deserializers.hpp" + #include // for column #include #include @@ -55,44 +57,33 @@ FileSourceStage::FileSourceStage(std::string filename, int repeat) : FileSourceStage::subscriber_fn_t FileSourceStage::build() { return [this](rxcpp::subscriber output) { - auto data_table = this->load_table(); - - // Using 0 will default to creating a new range index - int index_col_count = 0; + auto data_table = load_table_from_file(m_filename); + int index_col_count = get_index_col_count(data_table); - // Check if we have a first column with INT64 data type - if (data_table.metadata.column_names.size() >= 1 && - data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64) - { - std::regex index_regex(R"((unnamed: 0|id))", - std::regex_constants::ECMAScript | std::regex_constants::icase); + // Next, create the message metadata. This gets reused for repeats + // When index_col_count is 0 this will cause a new range index to be created + auto meta = MessageMeta::create_from_cpp(std::move(data_table), index_col_count); - // Get the column name - auto col_name = data_table.metadata.column_names[0]; + // next_meta stores a copy of the upcoming meta + std::shared_ptr next_meta = nullptr; - // Check it against some common terms - if (std::regex_search(col_name, index_regex)) + for (cudf::size_type repeat_idx = 0; repeat_idx < m_repeat; ++repeat_idx) + { + if (!output.is_subscribed()) { - // Also, if its the hideous 'Unnamed: 0', then just use an empty string - if (col_name == "Unnamed: 0") - { - data_table.metadata.column_names[0] = ""; - } - - index_col_count = 1; - } - } + // Grab the GIL before disposing, just in case + pybind11::gil_scoped_acquire gil; - // Next, create the message metadata. This gets reused for repeats - auto meta = MessageMeta::create_from_cpp(std::move(data_table), index_col_count); + // Reset meta to allow the DCHECK after the loop to pass + meta.reset(); - // Always push at least 1 - output.on_next(meta); + break; + } - for (cudf::size_type repeat_idx = 1; repeat_idx < m_repeat; ++repeat_idx) - { - // Clone the previous meta object + // Clone the meta object before pushing while we still have access to it + if (repeat_idx + 1 < m_repeat) { + // GIL must come after get_info pybind11::gil_scoped_acquire gil; // Use the copy function @@ -104,62 +95,22 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() df.attr("index") = index + df_len; - meta = MessageMeta::create_from_python(std::move(df)); + next_meta = MessageMeta::create_from_python(std::move(df)); } - output.on_next(meta); - } - - output.on_completed(); - }; -} - -cudf::io::table_with_metadata FileSourceStage::load_table() -{ - auto file_path = std::filesystem::path(m_filename); - - if (file_path.extension() == ".json" || file_path.extension() == ".jsonlines") - { - // First, load the file into json - auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{m_filename}).lines(true); - - auto tbl = cudf::io::read_json(options.build()); - - auto found = std::find(tbl.metadata.column_names.begin(), tbl.metadata.column_names.end(), "data"); + DCHECK(meta) << "Cannot push null meta"; - if (found == tbl.metadata.column_names.end()) - return tbl; + output.on_next(std::move(meta)); - // Super ugly but cudf cant handle newlines and add extra escapes. So we need to convert - // \\n -> \n - // \\/ -> \/ - auto columns = tbl.tbl->release(); - - size_t idx = found - tbl.metadata.column_names.begin(); - - auto updated_data = cudf::strings::replace( - cudf::strings_column_view{columns[idx]->view()}, cudf::string_scalar("\\n"), cudf::string_scalar("\n")); - - updated_data = cudf::strings::replace( - cudf::strings_column_view{updated_data->view()}, cudf::string_scalar("\\/"), cudf::string_scalar("/")); - - columns[idx] = std::move(updated_data); - - tbl.tbl = std::move(std::make_unique(std::move(columns))); + // Move next_meta into meta + std::swap(meta, next_meta); + } - return tbl; - } - else if (file_path.extension() == ".csv") - { - auto options = cudf::io::csv_reader_options::builder(cudf::io::source_info{m_filename}); + DCHECK(!meta) << "meta was not properly pushed"; + DCHECK(!next_meta) << "next_meta was not properly pushed"; - return cudf::io::read_csv(options.build()); - } - else - { - LOG(FATAL) << "Unknown extension for file: " << m_filename; - throw std::runtime_error("Unknown extension"); - } + output.on_completed(); + }; } // ************ FileSourceStageInterfaceProxy ************ // diff --git a/morpheus/_lib/src/stages/kafka_source.cpp b/morpheus/_lib/src/stages/kafka_source.cpp index a4eeaecfe1..801e0151e4 100644 --- a/morpheus/_lib/src/stages/kafka_source.cpp +++ b/morpheus/_lib/src/stages/kafka_source.cpp @@ -17,6 +17,7 @@ #include "morpheus/stages/kafka_source.hpp" +#include "morpheus/io/deserializers.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" @@ -519,31 +520,7 @@ cudf::io::table_with_metadata KafkaSourceStage::load_table(const std::string &bu auto options = cudf::io::json_reader_options::builder(cudf::io::source_info(buffer.c_str(), buffer.size())).lines(true); - auto tbl = cudf::io::read_json(options.build()); - - auto found = std::find(tbl.metadata.column_names.begin(), tbl.metadata.column_names.end(), "data"); - - if (found == tbl.metadata.column_names.end()) - return tbl; - - // Super ugly but cudf cant handle newlines and add extra escapes. So we need to convert - // \\n -> \n - // \\/ -> \/ - auto columns = tbl.tbl->release(); - - size_t idx = found - tbl.metadata.column_names.begin(); - - auto updated_data = cudf::strings::replace( - cudf::strings_column_view{columns[idx]->view()}, cudf::string_scalar("\\n"), cudf::string_scalar("\n")); - - updated_data = cudf::strings::replace( - cudf::strings_column_view{updated_data->view()}, cudf::string_scalar("\\/"), cudf::string_scalar("/")); - - columns[idx] = std::move(updated_data); - - tbl.tbl = std::move(std::make_unique(std::move(columns))); - - return tbl; + return load_json_table(options.build()); } template diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index a21b6213da..84a578e238 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -17,9 +17,9 @@ #include "morpheus/stages/triton_inference.hpp" -#include "morpheus/messages/memory/inference_memory.hpp" // for InferenceMemory -#include "morpheus/messages/memory/response_memory.hpp" // for ResponseMemory -#include "morpheus/messages/memory/tensor_memory.hpp" // for TensorMemory::tensor_map_t +#include "morpheus/messages/memory/inference_memory.hpp" // for InferenceMemory +#include "morpheus/messages/memory/response_memory_probs.hpp" // for ResponseMemoryProbs +#include "morpheus/messages/memory/tensor_memory.hpp" // for TensorMemory::tensor_map_t #include "morpheus/messages/multi_response_probs.hpp" #include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo #include "morpheus/objects/tensor.hpp" @@ -104,6 +104,11 @@ InferenceClientStage::InferenceClientStage(std::string model_name, this->connect_with_server(); // TODO(Devin) } +void InferenceClientStage::reset_request_id(size_t value) +{ + m_request_counter = value; +} + InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() { return [this](rxcpp::observable input, rxcpp::subscriber output) { @@ -116,7 +121,7 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() // When our tensor lengths are longer than our dataframe we will need to use the seq_ids // array to lookup how the values should map back into the dataframe const bool needs_seq_ids = x->mess_count != x->count; - auto reponse_memory = std::make_shared(x->mess_count); + std::map response_outputs; // Create the output memory blocks for (auto &model_output : m_model_outputs) @@ -131,13 +136,19 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() auto output_buffer = std::make_shared( elem_count * model_output.datatype.item_size(), rmm::cuda_stream_per_thread); - reponse_memory->tensors[model_output.mapped_name] = Tensor::create( + response_outputs[model_output.mapped_name] = Tensor::create( std::move(output_buffer), model_output.datatype, total_shape, std::vector{}, 0); } // This will be the final output of all mini-batches - auto response = std::make_shared( - x->meta, x->mess_offset, x->mess_count, std::move(reponse_memory), 0, reponse_memory->count); + auto response_mem_probs = + std::make_shared(x->mess_count, std::move(response_outputs)); + auto response = std::make_shared(x->meta, + x->mess_offset, + x->mess_count, + std::move(response_mem_probs), + 0, + response_mem_probs->count); std::unique_ptr> host_seq_ids{nullptr}; if (needs_seq_ids) @@ -187,7 +198,7 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() // Iterate on the model inputs in case the model takes less than what tensors are available std::vector, std::vector>> saved_inputs = foreach_map(m_model_inputs, [this, &mini_batch_input](auto const &model_input) { - DCHECK(mini_batch_input->memory->has_input(model_input.mapped_name)) + DCHECK(mini_batch_input->memory->has_tensor(model_input.mapped_name)) << "Model input '" << model_input.mapped_name << "' not found in InferenceMemory"; auto const &inp_tensor = mini_batch_input->get_input(model_input.mapped_name); @@ -235,10 +246,12 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() triton::client::InferResult *results; // Make a copy of the options object and set the request ID - auto options = m_options; - options.request_id_ = m_request_counter++; + auto options = m_options; + const std::string reqiest_id{std::to_string(m_request_counter++)}; + options.request_id_ = reqiest_id; + std::map headers{{"morpheus_request_id", reqiest_id}}; - CHECK_TRITON(client->Infer(&results, options, inputs, outputs)); + CHECK_TRITON(client->Infer(&results, options, inputs, outputs, headers)); for (auto &model_output : m_model_outputs) { @@ -490,4 +503,10 @@ std::shared_ptr> InferenceClientStage return stage; } + +void InferenceClientStageInterfaceProxy::reset_request_id(size_t value) +{ + InferenceClientStage::reset_request_id(value); +} + } // namespace morpheus diff --git a/morpheus/_lib/tests/CMakeLists.txt b/morpheus/_lib/tests/CMakeLists.txt index 4e5217574e..b2d4beb9ed 100644 --- a/morpheus/_lib/tests/CMakeLists.txt +++ b/morpheus/_lib/tests/CMakeLists.txt @@ -20,7 +20,6 @@ add_executable(test_libmorpheus # test_cuda.cu test_main.cpp test_matx_util.cpp - test_morpheus.cpp test_multi_slices.cpp test_tensor.cpp test_type_util_detail.cpp @@ -29,6 +28,7 @@ add_executable(test_libmorpheus target_link_libraries(test_libmorpheus PRIVATE cuda_utils + morpheus srf::pysrf GTest::gtest matx::matx diff --git a/morpheus/_lib/tests/test_matx_util.cpp b/morpheus/_lib/tests/test_matx_util.cpp index 53d076085c..417b3e856f 100644 --- a/morpheus/_lib/tests/test_matx_util.cpp +++ b/morpheus/_lib/tests/test_matx_util.cpp @@ -17,6 +17,7 @@ #include "./test_morpheus.hpp" // IWYU pragma: associated +#include "morpheus/io/deserializers.hpp" #include "morpheus/objects/dev_mem_info.hpp" #include "morpheus/utilities/matx_util.hpp" #include "morpheus/utilities/type_util.hpp" @@ -30,7 +31,7 @@ #include #include // for cuda_stream_per_thread #include -#include +#include // for SRF_CHECK_CUDA #include // for int64_t, int32_t, uint8_t #include // for std::getenv @@ -135,7 +136,7 @@ TEST_F(TestMatxUtil, ReduceMax2dColMajor) std::filesystem::path morpheus_root{std::getenv("MORPHEUS_ROOT")}; auto input_file = morpheus_root / "tests/tests_data/filter_probs.csv"; - auto table_m = load_table_from_csv(input_file); + auto table_m = morpheus::load_table_from_file(input_file); auto num_rows = table_m.tbl->num_rows(); auto num_cols = table_m.tbl->num_columns(); diff --git a/morpheus/_lib/tests/test_morpheus.cpp b/morpheus/_lib/tests/test_morpheus.cpp deleted file mode 100644 index 73da7bbca1..0000000000 --- a/morpheus/_lib/tests/test_morpheus.cpp +++ /dev/null @@ -1,27 +0,0 @@ -/** - * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "test_morpheus.hpp" - -#include -#include // IWYU pragma: keep - -cudf::io::table_with_metadata load_table_from_csv(std::string filename) -{ - auto options = cudf::io::csv_reader_options::builder(cudf::io::source_info{filename}); - return cudf::io::read_csv(options.build()); -} diff --git a/morpheus/_lib/tests/test_morpheus.hpp b/morpheus/_lib/tests/test_morpheus.hpp index 50c97c8cba..1b8c9d2667 100644 --- a/morpheus/_lib/tests/test_morpheus.hpp +++ b/morpheus/_lib/tests/test_morpheus.hpp @@ -17,14 +17,9 @@ #pragma once -#include #include // IWYU pragma: keep #include // IWYU pragma: keep -#include - -cudf::io::table_with_metadata load_table_from_csv(std::string filename); - #define TEST_CLASS(name) \ class Test##name : public ::testing::Test \ {} diff --git a/morpheus/_lib/tests/test_multi_slices.cpp b/morpheus/_lib/tests/test_multi_slices.cpp index ee94b80701..6bb4600dc0 100644 --- a/morpheus/_lib/tests/test_multi_slices.cpp +++ b/morpheus/_lib/tests/test_multi_slices.cpp @@ -17,26 +17,56 @@ #include "./test_morpheus.hpp" // IWYU pragma: associated +#include "morpheus/io/deserializers.hpp" +#include "morpheus/messages/meta.hpp" +#include "morpheus/messages/multi_inference.hpp" +#include "morpheus/messages/multi_response.hpp" +#include "morpheus/objects/tensor.hpp" +#include "morpheus/utilities/matx_util.hpp" // for MatxUtil::create_seg_ids +#include "morpheus/utilities/type_util.hpp" // for TypeId + +#include // for cudaMemcpy, cudaMemcpyHostToDevice #include #include #include #include #include #include +#include +#include +#include // for cuda_stream_per_thread +#include +#include // for SRF_CHECK_CUDA +#include #include #include #include // for unique_ptr +#include +#include //for typeid #include -TEST_CLASS(Masking); +using namespace morpheus; +namespace py = pybind11; + +namespace { +int random_int(int lower, int upper) +{ + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution uniform_dist(lower, upper); + return uniform_dist(e1); +} +} // namespace + +TEST_CLASS(MultiSlices); -TEST_F(TestMasking, Ranges) +TEST_F(TestMultiSlices, Ranges) { std::filesystem::path morpheus_root{std::getenv("MORPHEUS_ROOT")}; auto input_file = morpheus_root / "tests/tests_data/filter_probs.csv"; - auto table_m = load_table_from_csv(input_file); + auto table_m = load_table_from_file(input_file); EXPECT_EQ(table_m.tbl->num_rows(), 20); auto table_v = table_m.tbl->view(); diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index cd6293f457..7441b82a4e 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -14,6 +14,7 @@ import base64 import dataclasses +import json import logging import queue import threading @@ -25,8 +26,10 @@ import cupy as cp import numpy as np +import pandas as pd import srf import tritonclient.grpc as tritonclient +import tritonclient.http as tritonclient_http from tritonclient.utils import InferenceServerException from tritonclient.utils import triton_to_np_dtype @@ -39,6 +42,7 @@ from morpheus.messages import ResponseMemoryProbs from morpheus.stages.inference.inference_stage import InferenceStage from morpheus.stages.inference.inference_stage import InferenceWorker +from morpheus.utils.atomic_integer import AtomicInteger from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue logger = logging.getLogger(__name__) @@ -65,6 +69,12 @@ def _notify_dtype_once(model_name: str, input_name: str, triton_dtype: cp.dtype, raise RuntimeError(msg % msg_args) +def reset_request_id(value=0): + """Resets the request ID for both Python & C++ impls""" + _TritonInferenceWorker.reset_request_id(value=value) + _stages.InferenceClientStage.reset_request_id(value=value) + + @dataclasses.dataclass() class TritonInOut: """ @@ -496,6 +506,10 @@ def needs_logits(cls): def default_inout_mapping(cls) -> typing.Dict[str, str]: return {} + @classmethod + def reset_request_id(cls, value=0): + cls.__REQUEST_COUNTER.value = value + def init(self): """ This function instantiate triton client and memory allocation for inference input and output. @@ -617,11 +631,12 @@ def _infer_callback(self, if (self._response_log_dataframe is not None): - assert request_id not in self._response_log_dataframe.index, "Duplicate request_id: {} detected".format(request_id) + assert request_id not in self._response_log_dataframe.index, "Duplicate request_id: {} detected".\ + format(request_id) # Save the response and update the CSV. Double serialize to encode new lines self._response_log_dataframe.loc[int(result.get_response().id)] = json.dumps( - MessageToJson(result.get_response(), preserving_proto_field_name=True, indent=2)) + tritonclient.MessageToJson(result.get_response(), preserving_proto_field_name=True, indent=2)) # Sort by the index before writing self._response_log_dataframe.sort_index(axis=1, inplace=True) diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 91cacda4bc..a1be432d8a 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -143,19 +143,15 @@ def _generate_frames(self): df_type="cudf", ) - count = 0 - - for _ in range(self._repeat_count): + for i in range(self._repeat_count): x = MessageMeta(df) - yield x - - count += 1 - - # If we are looping, copy and shift the index - if (self._repeat_count > 0): - prev_df = df - df = prev_df.copy() + # If we are looping, copy the object. Do this before we push the object in case it changes + if (i + 1 < self._repeat_count): + df = df.copy() + # Shift the index to allow for unique indices without reading more data df.index += len(df) + + yield x diff --git a/pyproject.toml b/pyproject.toml index 3b03b11dd3..c34ce19508 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ build-backend = "setuptools.build_meta" # These show up when querying `pytest --markers` [tool.pytest.ini_options] markers = [ + "benchmark: Benchmarks", "slow: Slow tests", "kafka: Tests that require a running instance of kafka", "use_cpp: Test support C++ nodes and objects", diff --git a/tests/benchmarks/test_bench_e2e_pipelines.py b/tests/benchmarks/test_bench_e2e_pipelines.py index 2c5c152673..ba23ed03c0 100644 --- a/tests/benchmarks/test_bench_e2e_pipelines.py +++ b/tests/benchmarks/test_bench_e2e_pipelines.py @@ -127,7 +127,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark def test_sid_nlp_e2e(benchmark, tmp_path): config = Config() @@ -160,7 +160,7 @@ def test_sid_nlp_e2e(benchmark, tmp_path): benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_abp_fil_e2e(benchmark, tmp_path): config = Config() @@ -185,7 +185,7 @@ def test_abp_fil_e2e(benchmark, tmp_path): benchmark(fil_pipeline, config, input_filepath, repeat, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_phishing_nlp_e2e(benchmark, tmp_path): config = Config() @@ -207,7 +207,7 @@ def test_phishing_nlp_e2e(benchmark, tmp_path): benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_cloudtrail_ae_e2e(benchmark, tmp_path): config = Config() diff --git a/tests/benchmarks/test_bench_monitor_stage.py b/tests/benchmarks/test_bench_monitor_stage.py index ba2c991b21..e5e60c8e1a 100644 --- a/tests/benchmarks/test_bench_monitor_stage.py +++ b/tests/benchmarks/test_bench_monitor_stage.py @@ -45,7 +45,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame): pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000]) def test_monitor_stage(benchmark, num_messages): diff --git a/tests/benchmarks/test_bench_serialize_stage.py b/tests/benchmarks/test_bench_serialize_stage.py index ca251cd599..a554a16db2 100644 --- a/tests/benchmarks/test_bench_serialize_stage.py +++ b/tests/benchmarks/test_bench_serialize_stage.py @@ -47,7 +47,7 @@ def build_and_run_pipeline(config: Config, pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [1, 100, 10000]) @pytest.mark.parametrize("output_type", ["json", "csv"]) def test_monitor_stage(benchmark, num_messages, output_type): diff --git a/tests/conftest.py b/tests/conftest.py index 8fd55d3c7e..28af37ea3f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -96,6 +96,13 @@ def pytest_addoption(parser: pytest.Parser): help="Run kafka tests that would otherwise be skipped", ) + parser.addoption( + "--run_benchmark", + action="store_true", + dest="run_benchmark", + help="Run benchmark tests that would otherwise be skipped", + ) + def pytest_generate_tests(metafunc: pytest.Metafunc): """ @@ -133,6 +140,10 @@ def pytest_runtest_setup(item): if (item.get_closest_marker("kafka") is not None): pytest.skip("Skipping Kafka tests by default. Use --run_kafka to enable") + if (not item.config.getoption("--run_benchmark")): + if (item.get_closest_marker("benchmark") is not None): + pytest.skip("Skipping benchmark tests by default. Use --run_benchmark to enable") + def pytest_collection_modifyitems(config, items): """ @@ -296,9 +307,9 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=5): if (r.json()['message'] == 'I am alive.'): return True else: - warnings.warn( - "Camoflage returned status 200 but had incorrect response JSON. Continuing to wait. Response JSON:\n{}" - .format(r.json())) + warnings.warn(("Camoflage returned status 200 but had incorrect response JSON. " + "Continuing to wait. Response JSON:\n%s"), + r.json()) except Exception: pass @@ -314,7 +325,7 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=5): def _set_pdeathsig(sig=signal.SIGTERM): """ - Helper function to ensure once parent process exits, its childrent processes will automatically die + Helper function to ensure once parent process exits, its child processes will automatically die """ def prctl_fn(): @@ -364,16 +375,16 @@ def _camouflage_is_running(): logging.info("Launched camouflage in %s with pid: %s", root_dir, popen.pid) - if startup_timeout > 0: - if not wait_for_camouflage(timeout=startup_timeout): + if not wait_for_camouflage(timeout=startup_timeout): - if popen.poll() is not None: - raise RuntimeError("camouflage server exited with status code={} details in: {}".format( - popen.poll(), os.path.join(root_dir, 'camouflage.log'))) + if popen.poll() is not None: + raise RuntimeError("camouflage server exited with status code={} details in: {}".format( + popen.poll(), os.path.join(root_dir, 'camouflage.log'))) - raise RuntimeError("Failed to launch camouflage server") + raise RuntimeError("Failed to launch camouflage server") - yield is_running + # Must have been started by this point + yield True logging.info("Killing pid {}".format(popen.pid)) @@ -406,6 +417,9 @@ def launch_mock_triton(_camouflage_is_running): be useful during test development to run camouflage by hand. """ + from morpheus.stages.inference.triton_inference_stage import reset_request_id + reset_request_id(value=1) + # Check if we are using Camouflage or not. If so, send the reset command to reset the state if _camouflage_is_running: # Reset the mock server (necessary to set counters = 0) diff --git a/tests/mock_triton_server/mocks/reset/POST.mock b/tests/mock_triton_server/mocks/reset/POST.mock index 5b9cc30ffb..3215cf6691 100644 --- a/tests/mock_triton_server/mocks/reset/POST.mock +++ b/tests/mock_triton_server/mocks/reset/POST.mock @@ -3,14 +3,29 @@ Content-Type: application/json {{#code}} (()=>{ - if(!this.counter) { - this.counter=0; - } - logger.log(`Resetting counter from ${this.counter} to 0`); + // Create a new shared state object. + let counters = new Map(); - // Just reset the counter value to 0 - this.counter=0; + // Save it on the Handlebars object since that is shared by all functions + this.Handlebars._nv_morpheus = { + counters: counters, + get_and_increment: (method_name) => { + if (!counters.has(method_name)){ + counters.set(method_name, 0); + } + + // Get the current value + const curr_counter = counters.get(method_name) + 1; + + // Set the incremented value + counters.set(method_name, curr_counter); + + return curr_counter; + } + }; + + logger.info(`Resetting the counter object.`); return { status: 200, diff --git a/tests/mock_triton_server/mocks/v2/models/abp-nvsmi-xgb/infer/POST.mock b/tests/mock_triton_server/mocks/v2/models/abp-nvsmi-xgb/infer/POST.mock index 615cbeb9f9..4fef092509 100644 --- a/tests/mock_triton_server/mocks/v2/models/abp-nvsmi-xgb/infer/POST.mock +++ b/tests/mock_triton_server/mocks/v2/models/abp-nvsmi-xgb/infer/POST.mock @@ -1,29 +1,27 @@ -{{assign name='request_id' value=(capture from='body' using='jsonpath' selector='$.id') }} HTTP/1.1 200 OK Content-Type: application/octet-stream {{#inject}}(()=>{ - if(!this.counter) { - this.counter=0; - } - - console.log("Hit Infer"); - let request_id = context.data.root.request_id; + // Check for the shared state object + if (!this.Handlebars._nv_morpheus === undefined){ + throw "Must call `POST /reset` endpoint before calling other methods!"; + } - console.log(`Request ID: ${request_id}"); + const counter = this.Handlebars._nv_morpheus.get_and_increment(); + const filename = "payloads/abp/abp_infer_resp." + counter + ".body" - // this.counter+=1; - let filename = "payloads/abp/abp_infer_resp." + request_id + ".body" + logger.info(`Returning payload for counter: ${counter}, and filename: ${filename}`); let inf_header_content_length = 155; - if (request_id > 1) { + if (counter > 1) { inf_header_content_length = 153; } // This seems like the only way to pass a variable to the file helper - request._nv_morpheus_params = {counter: request_id, filename: filename}; - - console.log(`Responding with file: ${filename}`); + request._nv_morpheus_params = { + counter, + filename + }; return "Inference-Header-Content-Length: " + inf_header_content_length; })();{{/inject}} diff --git a/tests/mock_triton_server/mocks/v2/models/phishing-bert-onnx/infer/POST.mock b/tests/mock_triton_server/mocks/v2/models/phishing-bert-onnx/infer/POST.mock index 4ca48328ab..bed9f10955 100644 --- a/tests/mock_triton_server/mocks/v2/models/phishing-bert-onnx/infer/POST.mock +++ b/tests/mock_triton_server/mocks/v2/models/phishing-bert-onnx/infer/POST.mock @@ -2,15 +2,24 @@ HTTP/1.1 200 OK Content-Type: application/octet-stream Inference-Header-Content-Length: 156 {{#inject}}(()=>{ - if(!this.counter) { - this.counter=0; + + // Check for the shared state object + if (!this.Handlebars._nv_morpheus === undefined){ + throw "Must call `POST /reset` endpoint before calling other methods!"; } - this.counter+=1; - this.filename = "payloads/phishing/phishing_infer_resp." + this.counter + ".body" + const counter = this.Handlebars._nv_morpheus.get_and_increment(); + const filename = "payloads/phishing/phishing_infer_resp." + counter + ".body" + + logger.info(`Returning payload for counter: ${counter}, and filename: ${filename}`); // This seems like the only way to pass a variable to the file helper - request._nv_morpheus_params = {counter: this.counter, filename: this.filename}; + request._nv_morpheus_params = { + counter, + filename + }; + + return ""; })();{{/inject}} {{file path=request._nv_morpheus_params.filename}} diff --git a/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx-no-trunc/infer/POST.mock b/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx-no-trunc/infer/POST.mock index ade4d64f25..c128e97053 100644 --- a/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx-no-trunc/infer/POST.mock +++ b/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx-no-trunc/infer/POST.mock @@ -1,22 +1,29 @@ HTTP/1.1 200 OK Content-Type: application/octet-stream {{#inject}}(()=>{ - if(!this.counter) { - this.counter=0; + + // Check for the shared state object + if (!this.Handlebars._nv_morpheus === undefined){ + throw "Must call `POST /reset` endpoint before calling other methods!"; } - this.counter+=1; - this.filename = "payloads/sid-no-trunc/sid_infer_resp." + this.counter + ".body" + const counter = this.Handlebars._nv_morpheus.get_and_increment(); + const filename = "payloads/sid-no-trunc/sid_infer_resp." + counter + ".body" + + logger.info(`Returning payload for counter: ${counter}, and filename: ${filename}`); let inf_header_content_length = 157; - if (this.counter === 33) { + if (counter === 33) { inf_header_content_length = 156; - } else if (this.counter === 65) { + } else if (counter === 65) { inf_header_content_length = 155; } // This seems like the only way to pass a variable to the file helper - request._nv_morpheus_params = {counter: this.counter, filename: this.filename}; + request._nv_morpheus_params = { + counter, + filename + }; return "Inference-Header-Content-Length: " + inf_header_content_length; })();{{/inject}} diff --git a/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx/infer/POST.mock b/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx/infer/POST.mock index 8edf43c7f4..526be5d481 100644 --- a/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx/infer/POST.mock +++ b/tests/mock_triton_server/mocks/v2/models/sid-minibert-onnx/infer/POST.mock @@ -1,19 +1,27 @@ HTTP/1.1 200 OK Content-Type: application/octet-stream +{{#inject}}(()=>{ -{{#async_code}}(async ()=>{ - const fs = require('fs'); - const Q = require('q') - - if(!this.counter) { - this.counter=0; + // Check for the shared state object + if (!this.Handlebars._nv_morpheus === undefined){ + throw "Must call `POST /reset` endpoint before calling other methods!"; } - this.counter+=1; + const counter = this.Handlebars._nv_morpheus.get_and_increment(); + const filename = "payloads/sid/sid_infer_resp." + counter + ".body" + + logger.info(`Returning payload for counter: ${counter}, and filename: ${filename}`); - console.log(request); + let inf_header_content_length = 157; + if (counter === 63) { + inf_header_content_length = 156; + } - // let request_id = this.counter; + // This seems like the only way to pass a variable to the file helper + request._nv_morpheus_params = { + counter, + filename + }; // console.log(`Request ID: ${request_id}`); diff --git a/tests/test_add_scores_stage_pipe.py b/tests/test_add_scores_stage_pipe.py index 52ab084e1f..42c2d39427 100755 --- a/tests/test_add_scores_stage_pipe.py +++ b/tests/test_add_scores_stage_pipe.py @@ -80,14 +80,16 @@ def test_add_scores_stage_pipe(config, tmp_path, order, pipeline_batch_size, rep assert output_np.tolist() == expected.tolist() -def test_add_scores_stage_multi_segment_pipe(config, tmp_path): +@pytest.mark.parametrize('repeat', [1, 2, 5]) +def test_add_scores_stage_multi_segment_pipe(config, tmp_path, repeat): + # Intentionally using FileSourceStage's repeat argument as this triggers a bug in #443 config.class_labels = ['frogs', 'lizards', 'toads', 'turtles'] input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, 'results.csv') pipe = LinearPipeline(config) - pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False)) + pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False, repeat=repeat)) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_segment_boundary(MultiMessage) @@ -102,7 +104,8 @@ def test_add_scores_stage_multi_segment_pipe(config, tmp_path): assert os.path.exists(out_file) - expected = np.loadtxt(input_file, delimiter=",", skiprows=1) + expected_data = np.loadtxt(input_file, delimiter=",", skiprows=1) + expected = np.concatenate([expected_data for _ in range(repeat)]) # The output data will contain an additional id column that we will need to slice off # also somehow 0.7 ends up being 0.7000000000000001