Skip to content

Commit

Permalink
Fix race condition in the C++ impl for the pre-process fil stage (#1390)
Browse files Browse the repository at this point in the history
* Fixes a race condition where the `fix_bad_columns` method attempts to fix the same column more than once per-DF, resulting in a failure on the second attempt.


## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1390
  • Loading branch information
dagardner-nv authored Nov 29, 2023
1 parent ba28cff commit 3a02383
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
2 changes: 1 addition & 1 deletion examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ From the Morpheus repo root directory, run:
export MORPHEUS_ROOT=$(pwd)
# Launch Morpheus printing debug messages
morpheus --log_level=DEBUG \
`# Run a pipeline with 8 threads and a model batch size of 32 (Must be equal or less than Triton config)` \
`# Run a pipeline with 8 threads and a model batch size of 1024 (Must be equal or less than Triton config)` \
run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=1024 \
`# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \
pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \
Expand Down
86 changes: 48 additions & 38 deletions morpheus/_lib/src/stages/preprocess_fil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
}
}

// Need to do a transpose here
// Need to convert from row major to column major
// Easiest way to do this is to transpose the data from [fea_len, row_count] to [row_count, fea_len]
auto transposed_data =
MatxUtil::transpose(DevMemInfo{packed_data,
TypeId::FLOAT32,
{x->mess_count, static_cast<TensorIndex>(m_fea_cols.size())},
{1, x->mess_count}});
{static_cast<TensorIndex>(m_fea_cols.size()), x->mess_count},
{x->mess_count, 1}});

// Create the tensor which will be row-major and size [row_count, fea_len]
auto input__0 = Tensor::create(transposed_data,
DType::create<float>(),
{x->mess_count, static_cast<TensorIndex>(m_fea_cols.size())},
Expand All @@ -121,8 +123,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
input__0.get_memory(),
x->mess_offset),
seq_id_dtype,
{x->mess_count, 3},
{},
{x->mess_count, 3},
{},
0);

// Build the results
Expand All @@ -134,8 +136,12 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()

output.on_next(std::move(next));
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

Expand All @@ -144,50 +150,54 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x)
std::vector<std::string> bad_cols;

{
// TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed
auto df_meta = x->get_meta(m_fea_cols);
auto df_meta_col_names = df_meta.get_column_names();
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
auto df_meta_col_names = mutable_info.get_column_names();

for (size_t i = 0; i < df_meta.num_columns(); ++i)
// Only check the feature columns. Leave the rest unchanged
for (auto& fea_col : m_fea_cols)
{
if (df_meta.get_column(i).type().id() == cudf::type_id::STRING)
// Find the index of the column in the dataframe
auto col_idx =
std::find(df_meta_col_names.begin(), df_meta_col_names.end(), fea_col) - df_meta_col_names.begin();

if (col_idx == df_meta_col_names.size())
{
// This feature was not found. Ignore it.
continue;
}

if (mutable_info.get_column(col_idx).type().id() == cudf::type_id::STRING)
{
bad_cols.push_back(df_meta_col_names[i]);
bad_cols.push_back(fea_col);
}
}

// Exit early if there is nothing to do
if (bad_cols.empty())
if (!bad_cols.empty())
{
return df_meta;
}
}
// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;

// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
{
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;
std::string regex = R"((\d+))";

// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

std::string regex = R"((\d+))";
for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
}

for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
mutable_info.return_obj(std::move(df));
}

mutable_info.return_obj(std::move(df));
}

// Now re-get the meta
Expand Down

0 comments on commit 3a02383

Please sign in to comment.