From 3a0238393c8be39ba0752635c9bf355796723523 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Wed, 29 Nov 2023 14:34:42 -0800 Subject: [PATCH] Fix race condition in the C++ impl for the pre-process fil stage (#1390) * Fixes a race condition where the `fix_bad_columns` method attempts to fix the same column more than once per-DF, resulting in a failure on the second attempt. ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - Devin Robison (https://github.com/drobison00) - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1390 --- examples/abp_nvsmi_detection/README.md | 2 +- morpheus/_lib/src/stages/preprocess_fil.cpp | 86 ++++++++++++--------- 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/examples/abp_nvsmi_detection/README.md b/examples/abp_nvsmi_detection/README.md index 59d09a56a9..ce63ec15c0 100644 --- a/examples/abp_nvsmi_detection/README.md +++ b/examples/abp_nvsmi_detection/README.md @@ -110,7 +110,7 @@ From the Morpheus repo root directory, run: export MORPHEUS_ROOT=$(pwd) # Launch Morpheus printing debug messages morpheus --log_level=DEBUG \ - `# Run a pipeline with 8 threads and a model batch size of 32 (Must be equal or less than Triton config)` \ + `# Run a pipeline with 8 threads and a model batch size of 1024 (Must be equal or less than Triton config)` \ run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ `# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \ pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \ diff --git a/morpheus/_lib/src/stages/preprocess_fil.cpp b/morpheus/_lib/src/stages/preprocess_fil.cpp index b6d6e8c824..8e3245b6f2 100644 --- a/morpheus/_lib/src/stages/preprocess_fil.cpp +++ b/morpheus/_lib/src/stages/preprocess_fil.cpp @@ -101,13 +101,15 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator() } } - // Need to do a transpose here + // Need to convert from row major to column major + // Easiest way to do this is to transpose the data from [fea_len, row_count] to [row_count, fea_len] auto transposed_data = MatxUtil::transpose(DevMemInfo{packed_data, TypeId::FLOAT32, - {x->mess_count, static_cast(m_fea_cols.size())}, - {1, x->mess_count}}); + {static_cast(m_fea_cols.size()), x->mess_count}, + {x->mess_count, 1}}); + // Create the tensor which will be row-major and size [row_count, fea_len] auto input__0 = Tensor::create(transposed_data, DType::create(), {x->mess_count, static_cast(m_fea_cols.size())}, @@ -121,8 +123,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator() input__0.get_memory(), x->mess_offset), seq_id_dtype, - {x->mess_count, 3}, - {}, + {x->mess_count, 3}, + {}, 0); // Build the results @@ -134,8 +136,12 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator() output.on_next(std::move(next)); }, - [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, - [&]() { output.on_completed(); })); + [&](std::exception_ptr error_ptr) { + output.on_error(error_ptr); + }, + [&]() { + output.on_completed(); + })); }; } @@ -144,50 +150,54 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x) std::vector bad_cols; { - // TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed - auto df_meta = x->get_meta(m_fea_cols); - auto df_meta_col_names = df_meta.get_column_names(); + // Get the mutable info for the entire meta object so we only do this once per dataframe + auto mutable_info = x->meta->get_mutable_info(); + auto df_meta_col_names = mutable_info.get_column_names(); - for (size_t i = 0; i < df_meta.num_columns(); ++i) + // Only check the feature columns. Leave the rest unchanged + for (auto& fea_col : m_fea_cols) { - if (df_meta.get_column(i).type().id() == cudf::type_id::STRING) + // Find the index of the column in the dataframe + auto col_idx = + std::find(df_meta_col_names.begin(), df_meta_col_names.end(), fea_col) - df_meta_col_names.begin(); + + if (col_idx == df_meta_col_names.size()) + { + // This feature was not found. Ignore it. + continue; + } + + if (mutable_info.get_column(col_idx).type().id() == cudf::type_id::STRING) { - bad_cols.push_back(df_meta_col_names[i]); + bad_cols.push_back(fea_col); } } // Exit early if there is nothing to do - if (bad_cols.empty()) + if (!bad_cols.empty()) { - return df_meta; - } - } + // Need to ensure all string columns have been converted to numbers. This requires running a + // regex which is too difficult to do from C++ at this time. So grab the GIL, make the + // conversions, and release. This is horribly inefficient, but so is the JSON lines format for + // this workflow + using namespace pybind11::literals; + pybind11::gil_scoped_acquire gil; - // Need to ensure all string columns have been converted to numbers. This requires running a - // regex which is too difficult to do from C++ at this time. So grab the GIL, make the - // conversions, and release. This is horribly inefficient, but so is the JSON lines format for - // this workflow - { - // Get the mutable info for the entire meta object so we only do this once per dataframe - auto mutable_info = x->meta->get_mutable_info(); + // pybind11::object df = x->meta->get_py_table(); + auto df = mutable_info.checkout_obj(); - using namespace pybind11::literals; - pybind11::gil_scoped_acquire gil; + std::string regex = R"((\d+))"; - // pybind11::object df = x->meta->get_py_table(); - auto df = mutable_info.checkout_obj(); - - std::string regex = R"((\d+))"; + for (auto c : bad_cols) + { + df[pybind11::str(c)] = df[pybind11::str(c)] + .attr("str") + .attr("extract")(pybind11::str(regex), "expand"_a = true) + .attr("astype")(pybind11::str("float32")); + } - for (auto c : bad_cols) - { - df[pybind11::str(c)] = df[pybind11::str(c)] - .attr("str") - .attr("extract")(pybind11::str(regex), "expand"_a = true) - .attr("astype")(pybind11::str("float32")); + mutable_info.return_obj(std::move(df)); } - - mutable_info.return_obj(std::move(df)); } // Now re-get the meta